/*
 * Decompiled with CFR 0.152.
 */
package org.apache.solr.crossdc.common;

import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.common.MirroringException;
import org.apache.solr.crossdc.common.RequestMirroringSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMirroringSink
implements RequestMirroringSink,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final KafkaCrossDcConf conf;
    private final Producer<String, MirroredSolrRequest<?>> producer;
    private final KafkaConsumer<String, MirroredSolrRequest<?>> consumer;
    private final String mainTopic;
    private final String dlqTopic;

    public KafkaMirroringSink(KafkaCrossDcConf conf) {
        this.conf = conf;
        this.producer = this.initProducer();
        this.consumer = this.initConsumer();
        this.mainTopic = conf.get("solr.crossdc.topicName").split(",")[0];
        this.dlqTopic = conf.get("solr.crossdc.dlqTopicName");
        this.checkTopicsAvailability();
    }

    @Override
    public void submit(MirroredSolrRequest<?> request) throws MirroringException {
        this.submitRequest(request, this.mainTopic);
    }

    @Override
    public void submitToDlq(MirroredSolrRequest<?> request) throws MirroringException {
        if (this.dlqTopic != null) {
            this.submitRequest(request, this.dlqTopic);
        } else if (log.isInfoEnabled()) {
            log.info("- no DLQ, dropping failed {}", request);
        }
    }

    private void checkTopicsAvailability() {
        Map topics = this.consumer.listTopics();
        if (this.mainTopic != null && !topics.containsKey(this.mainTopic)) {
            throw new RuntimeException("Main topic " + this.mainTopic + " is not available");
        }
        if (this.dlqTopic != null && !topics.containsKey(this.dlqTopic)) {
            throw new RuntimeException("DLQ topic " + this.dlqTopic + " is not available");
        }
    }

    private void submitRequest(MirroredSolrRequest<?> request, String topicName) throws MirroringException {
        if (log.isDebugEnabled()) {
            log.debug("About to submit a MirroredSolrRequest");
        }
        long enqueueStartNanos = System.nanoTime();
        try {
            this.producer.send(new ProducerRecord(topicName, request), (metadata, exception) -> {
                if (exception != null) {
                    log.error("Failed adding update to CrossDC queue! request={}", request.getSolrRequest(), (Object)exception);
                }
            });
            long lastSuccessfulEnqueueNanos = System.nanoTime();
            long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
            if (elapsedTimeMillis > (long)this.conf.getInt("solr.crossdc.slowSubmitThresholdMs").intValue()) {
                this.slowSubmitAction(request, elapsedTimeMillis);
            }
        }
        catch (Exception e) {
            String message = "Unable to enqueue request " + request + ", configured retries is" + this.conf.getInt("solr.crossdc.numRetries") + " and configured max delivery timeout in ms is " + this.conf.getInt("solr.crossdc.deliveryTimeoutMS");
            log.error(message, (Throwable)e);
            throw new MirroringException(message, e);
        }
    }

    private Producer<String, MirroredSolrRequest<?>> initProducer() {
        Properties kafkaProducerProps = new Properties();
        log.info("Starting CrossDC Producer {}", (Object)this.conf);
        kafkaProducerProps.put("bootstrap.servers", this.conf.get("solr.crossdc.bootstrapServers"));
        kafkaProducerProps.put("acks", "all");
        String retries = this.conf.get("solr.crossdc.numRetries");
        if (retries != null) {
            kafkaProducerProps.put("retries", (Object)Integer.parseInt(retries));
        }
        kafkaProducerProps.put("retry.backoff.ms", this.conf.getInt("solr.crossdc.retryBackoffMs"));
        kafkaProducerProps.put("delivery.timeout.ms", this.conf.getInt("solr.crossdc.deliveryTimeoutMS"));
        kafkaProducerProps.put("max.request.size", this.conf.getInt("solr.crossdc.maxRequestSizeBytes"));
        kafkaProducerProps.put("batch.size", this.conf.getInt("solr.crossdc.batchSizeBytes"));
        kafkaProducerProps.put("buffer.memory", this.conf.getInt("solr.crossdc.bufferMemoryBytes"));
        kafkaProducerProps.put("linger.ms", this.conf.getInt("solr.crossdc.lingerMs"));
        kafkaProducerProps.put("request.timeout.ms", this.conf.getInt("solr.crossdc.requestTimeoutMS"));
        kafkaProducerProps.put("compression.type", this.conf.get("solr.crossdc.enableDataCompression"));
        kafkaProducerProps.put("key.serializer", StringSerializer.class.getName());
        kafkaProducerProps.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
        KafkaCrossDcConf.addSecurityProps(this.conf, kafkaProducerProps);
        kafkaProducerProps.putAll(this.conf.getAdditionalProperties());
        if (log.isDebugEnabled()) {
            log.debug("Kafka Producer props={}", (Object)kafkaProducerProps);
        }
        return new KafkaProducer(kafkaProducerProps);
    }

    private KafkaConsumer<String, MirroredSolrRequest<?>> initConsumer() {
        Properties kafkaConsumerProperties = new Properties();
        kafkaConsumerProperties.put("bootstrap.servers", this.conf.get("solr.crossdc.bootstrapServers"));
        kafkaConsumerProperties.put("group.id", this.conf.get("solr.crossdc.groupId"));
        kafkaConsumerProperties.put("max.poll.records", this.conf.getInt("solr.crossdc.maxPollRecords"));
        kafkaConsumerProperties.put("max.poll.interval.ms", this.conf.get("solr.crossdc.maxPollIntervalMs"));
        kafkaConsumerProperties.put("session.timeout.ms", this.conf.get("solr.crossdc.sessionTimeoutMs"));
        kafkaConsumerProperties.put("auto.offset.reset", "earliest");
        kafkaConsumerProperties.put("enable.auto.commit", (Object)false);
        kafkaConsumerProperties.put("fetch.min.bytes", this.conf.getInt("solr.crossdc.fetchMinBytes"));
        kafkaConsumerProperties.put("fetch.max.wait.ms", this.conf.getInt("solr.crossdc.fetchMaxWaitMS"));
        kafkaConsumerProperties.put("fetch.max.bytes", this.conf.getInt("solr.crossdc.fetchMaxBytes"));
        kafkaConsumerProperties.put("max.partition.fetch.bytes", this.conf.getInt("solr.crossdc.maxPartitionFetchBytes"));
        kafkaConsumerProperties.put("request.timeout.ms", this.conf.getInt("solr.crossdc.requestTimeoutMS"));
        kafkaConsumerProperties.putAll(this.conf.getAdditionalProperties());
        return new KafkaConsumer(kafkaConsumerProperties, (Deserializer)new StringDeserializer(), (Deserializer)new MirroredSolrRequestSerializer());
    }

    private void slowSubmitAction(Object request, long elapsedTimeMillis) {
        log.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}", (Object)this.conf.get("solr.crossdc.slowSubmitThresholdMs"), (Object)elapsedTimeMillis);
    }

    @Override
    public void close() throws IOException {
        if (this.producer != null) {
            this.producer.flush();
            this.producer.close();
        }
    }
}

