/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.kafka.buffer;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
import org.opensearch.dataprepper.model.buffer.AbstractBuffer;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.encryption.EncryptionSupplier;
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
import org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBufferConfig;
import org.opensearch.dataprepper.plugins.kafka.buffer.serialization.BufferSerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

@DataPrepperPlugin(name="kafka", pluginType=Buffer.class, pluginConfigurationType=KafkaBufferConfig.class)
public class KafkaBuffer
extends AbstractBuffer<Record<Event>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer.class);
    static final long EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = 30L;
    public static final int INNER_BUFFER_CAPACITY = 1000000;
    public static final int INNER_BUFFER_BATCH_SIZE = 250000;
    static final String WRITE = "Write";
    static final String READ = "Read";
    static final String MDC_KAFKA_PLUGIN_VALUE = "buffer";
    private final KafkaCustomProducer producer;
    private final KafkaAdminAccessor kafkaAdminAccessor;
    private final AbstractBuffer<Record<Event>> innerBuffer;
    private final ExecutorService executorService;
    private final Duration drainTimeout;
    private final List<KafkaCustomConsumer> consumers;
    private AtomicBoolean shutdownInProgress;
    private ByteDecoder byteDecoder;

    @DataPrepperPluginConstructor
    public KafkaBuffer(PluginSetting pluginSetting, KafkaBufferConfig kafkaBufferConfig, AcknowledgementSetManager acknowledgementSetManager, ByteDecoder byteDecoder, AwsCredentialsSupplier awsCredentialsSupplier, CircuitBreaker circuitBreaker, EncryptionSupplier encryptionSupplier) {
        super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName() + MDC_KAFKA_PLUGIN_VALUE), pluginSetting.getPipelineName());
        CompressionOption manualCompressionConfig = CompressionOption.NONE;
        if (kafkaBufferConfig.getTopic().encryptionAtRestEnabled() && kafkaBufferConfig.getKafkaProducerProperties() != null) {
            manualCompressionConfig = CompressionOption.fromOptionValue((String)kafkaBufferConfig.getKafkaProducerProperties().getCompressionType());
            kafkaBufferConfig.getKafkaProducerProperties().setCompressionType(CompressionOption.NONE.name().toLowerCase());
        }
        BufferSerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory(), encryptionSupplier);
        KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, new TopicServiceFactory());
        this.byteDecoder = byteDecoder;
        String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName());
        PluginMetrics producerMetrics = PluginMetrics.fromNames((String)(metricPrefixName + WRITE), (String)pluginSetting.getPipelineName());
        this.producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, false, manualCompressionConfig);
        KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
        this.innerBuffer = new BlockingBuffer(1000000, 250000, pluginSetting.getPipelineName());
        this.shutdownInProgress = new AtomicBoolean(false);
        PluginMetrics consumerMetrics = PluginMetrics.fromNames((String)(metricPrefixName + READ), (String)pluginSetting.getPipelineName());
        this.consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), (Buffer<Record<Event>>)this.innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, this.shutdownInProgress, false, circuitBreaker, manualCompressionConfig);
        this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
        this.executorService = Executors.newFixedThreadPool(this.consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
        this.consumers.forEach(this.executorService::submit);
        this.drainTimeout = kafkaBufferConfig.getDrainTimeout();
    }

    public Optional<Integer> getMaxRequestSize() {
        return Optional.of(this.producer.getMaxRequestSize());
    }

    public Optional<Integer> getOptimalRequestSize() {
        return Optional.of(0x100000);
    }

    public void writeBytes(byte[] bytes, String key, int timeoutInMillis) throws Exception {
        try {
            KafkaBuffer.setMdc();
            this.producer.produceRawData(bytes, key);
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            if (e.getCause() == null || e instanceof TimeoutException) {
                throw e;
            }
            if (e.getCause() instanceof RecordTooLargeException || e.getCause() instanceof RecordBatchTooLargeException) {
                throw new SizeOverflowException(e.getMessage());
            }
            throw new RuntimeException(e);
        }
        finally {
            KafkaBuffer.resetMdc();
        }
    }

    public void doWrite(Record<Event> record, int timeoutInMillis) throws TimeoutException {
        try {
            KafkaBuffer.setMdc();
            this.producer.produceRecords(record);
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new RuntimeException(e);
        }
        finally {
            KafkaBuffer.resetMdc();
        }
    }

    public boolean isByteBuffer() {
        return true;
    }

    public boolean areAcknowledgementsEnabled() {
        return true;
    }

    public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) throws Exception {
        for (Record<Event> record : records) {
            this.doWrite(record, timeoutInMillis);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map.Entry<Collection<Record<Event>>, CheckpointState> doRead(int timeoutInMillis) {
        try {
            KafkaBuffer.setMdc();
            Map.Entry result = this.innerBuffer.read(timeoutInMillis);
            if (result != null) {
                this.updateLatency((Collection)result.getKey());
            }
            Map.Entry entry = result;
            return entry;
        }
        finally {
            KafkaBuffer.resetMdc();
        }
    }

    public void postProcess(Long recordsInBuffer) {
        try {
            KafkaBuffer.setMdc();
            this.innerBuffer.postProcess(recordsInBuffer);
        }
        finally {
            KafkaBuffer.resetMdc();
        }
    }

    public void doCheckpoint(CheckpointState checkpointState) {
        try {
            KafkaBuffer.setMdc();
            this.innerBuffer.checkpoint(checkpointState);
        }
        finally {
            KafkaBuffer.resetMdc();
        }
    }

    public boolean isEmpty() {
        try {
            KafkaBuffer.setMdc();
            boolean bl = this.kafkaAdminAccessor.areTopicsEmpty() && this.innerBuffer.isEmpty();
            return bl;
        }
        finally {
            KafkaBuffer.resetMdc();
        }
    }

    public Duration getDrainTimeout() {
        return this.drainTimeout;
    }

    public boolean isWrittenOffHeapOnly() {
        return true;
    }

    int getInnerBufferRecordsInFlight() {
        return this.innerBuffer.getRecordsInFlight();
    }

    public void shutdown() {
        try {
            KafkaBuffer.setMdc();
            this.shutdownInProgress.set(true);
            this.executorService.shutdown();
            try {
                if (this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                    LOG.info("Successfully waited for consumer task to terminate");
                } else {
                    LOG.warn("Consumer task did not terminate in time, forcing termination");
                    this.executorService.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for consumer task to terminate", (Throwable)e);
                this.executorService.shutdownNow();
            }
            LOG.info("Closing {} consumers", (Object)this.consumers.size());
            this.consumers.forEach(KafkaCustomConsumer::closeConsumer);
            this.innerBuffer.shutdown();
        }
        finally {
            KafkaBuffer.resetMdc();
        }
    }

    private static void setMdc() {
        MDC.put((String)"kafkaPluginType", (String)MDC_KAFKA_PLUGIN_VALUE);
    }

    private static void resetMdc() {
        MDC.remove((String)"kafkaPluginType");
    }
}

