/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.util;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.Consumer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.producer.Producer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.PartitionInfo;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.errors.TimeoutException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.errors.WakeupException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.utils.Time;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.util.Callback;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.util.FutureCallback;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedLog<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
    private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30L);
    private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1L);
    private static final Duration ADMIN_CLIENT_RETRY_DURATION = Duration.ofMinutes(15L);
    private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(10L);
    private final Time time;
    private final String topic;
    private int partitionCount;
    private final Map<String, Object> producerConfigs;
    private final Map<String, Object> consumerConfigs;
    private final Callback<ConsumerRecord<K, V>> consumedCallback;
    private final Supplier<TopicAdmin> topicAdminSupplier;
    private Consumer<K, V> consumer;
    private Producer<K, V> producer;
    private TopicAdmin admin;
    private Thread thread;
    private boolean stopRequested;
    private final Queue<Callback<Void>> readLogEndOffsetCallbacks;
    private final java.util.function.Consumer<TopicAdmin> initializer;

    @Deprecated
    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Callback<ConsumerRecord<K, V>> consumedCallback, Time time, Runnable initializer) {
        this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null);
    }

    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Supplier<TopicAdmin> topicAdminSupplier, Callback<ConsumerRecord<K, V>> consumedCallback, Time time, java.util.function.Consumer<TopicAdmin> initializer) {
        this.topic = topic;
        this.producerConfigs = producerConfigs;
        this.consumerConfigs = consumerConfigs;
        this.topicAdminSupplier = Objects.requireNonNull(topicAdminSupplier);
        this.consumedCallback = consumedCallback;
        this.stopRequested = false;
        this.readLogEndOffsetCallbacks = new ArrayDeque<Callback<Void>>();
        this.time = time;
        this.initializer = initializer != null ? initializer : admin -> {};
    }

    public void start() {
        log.info("Starting KafkaBasedLog with topic " + this.topic);
        this.admin = this.topicAdminSupplier.get();
        this.initializer.accept(this.admin);
        this.producer = this.createProducer();
        this.consumer = this.createConsumer();
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        List<PartitionInfo> partitionInfos = this.consumer.partitionsFor(this.topic);
        long started = this.time.nanoseconds();
        long sleepMs = 100L;
        while (partitionInfos.isEmpty() && this.time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
            this.time.sleep(sleepMs);
            sleepMs = Math.min(2L * sleepMs, MAX_SLEEP_MS);
            partitionInfos = this.consumer.partitionsFor(this.topic);
        }
        if (partitionInfos.isEmpty()) {
            throw new ConnectException("Could not look up partition metadata for offset backing store topic in allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.");
        }
        for (PartitionInfo partition : partitionInfos) {
            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        this.partitionCount = partitions.size();
        this.consumer.assign(partitions);
        this.consumer.seekToBeginning(partitions);
        this.readToLogEnd(true);
        this.thread = new WorkThread();
        this.thread.start();
        log.info("Finished reading KafkaBasedLog for topic " + this.topic);
        log.info("Started KafkaBasedLog for topic " + this.topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        log.info("Stopping KafkaBasedLog for topic " + this.topic);
        KafkaBasedLog kafkaBasedLog = this;
        synchronized (kafkaBasedLog) {
            this.stopRequested = true;
        }
        this.consumer.wakeup();
        try {
            this.thread.join();
        }
        catch (InterruptedException e) {
            throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting down it's producer and consumer.", e);
        }
        try {
            this.producer.close();
        }
        catch (KafkaException e) {
            log.error("Failed to stop KafkaBasedLog producer", e);
        }
        try {
            this.consumer.close();
        }
        catch (KafkaException e) {
            log.error("Failed to stop KafkaBasedLog consumer", e);
        }
        this.admin = null;
        log.info("Stopped KafkaBasedLog for topic " + this.topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void readToEnd(Callback<Void> callback) {
        log.trace("Starting read to end log for topic {}", (Object)this.topic);
        this.producer.flush();
        KafkaBasedLog kafkaBasedLog = this;
        synchronized (kafkaBasedLog) {
            this.readLogEndOffsetCallbacks.add(callback);
        }
        this.consumer.wakeup();
    }

    public void flush() {
        this.producer.flush();
    }

    public Future<Void> readToEnd() {
        FutureCallback<Void> future = new FutureCallback<Void>(null);
        this.readToEnd(future);
        return future;
    }

    public void send(K key, V value) {
        this.send(key, value, null);
    }

    public void send(K key, V value, org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.producer.Callback callback) {
        this.producer.send(new ProducerRecord<K, V>(this.topic, key, value), callback);
    }

    public int partitionCount() {
        return this.partitionCount;
    }

    private Producer<K, V> createProducer() {
        this.producerConfigs.put("acks", "all");
        this.producerConfigs.put("max.in.flight.requests.per.connection", 1);
        return new KafkaProducer(this.producerConfigs);
    }

    private Consumer<K, V> createConsumer() {
        this.consumerConfigs.put("auto.offset.reset", "earliest");
        this.consumerConfigs.put("enable.auto.commit", false);
        return new KafkaConsumer(this.consumerConfigs);
    }

    private void poll(long timeoutMs) {
        try {
            ConsumerRecords<K, V> records = this.consumer.poll(Duration.ofMillis(timeoutMs));
            for (ConsumerRecord<K, V> record : records) {
                this.consumedCallback.onCompletion(null, record);
            }
        }
        catch (WakeupException e) {
            throw e;
        }
        catch (KafkaException e) {
            log.error("Error polling: " + e);
        }
    }

    private void readToLogEnd(boolean shouldRetry) {
        Set<TopicPartition> assignment = this.consumer.assignment();
        Map<TopicPartition, Long> endOffsets = this.readEndOffsets(assignment, shouldRetry);
        log.trace("Reading to end of log offsets {}", (Object)endOffsets);
        block0: while (!endOffsets.isEmpty()) {
            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<TopicPartition, Long> entry = it.next();
                TopicPartition topicPartition = entry.getKey();
                long endOffset = entry.getValue();
                long lastConsumedOffset = this.consumer.position(topicPartition);
                if (lastConsumedOffset >= endOffset) {
                    log.trace("Read to end offset {} for {}", (Object)endOffset, (Object)topicPartition);
                    it.remove();
                    continue;
                }
                log.trace("Behind end offset {} for {}; last-read offset is {}", endOffset, topicPartition, lastConsumedOffset);
                this.poll(Integer.MAX_VALUE);
                continue block0;
            }
        }
    }

    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment, boolean shouldRetry) {
        log.trace("Reading to end of offset log");
        if (this.admin != null) {
            try {
                if (shouldRetry) {
                    return this.admin.retryEndOffsets(assignment, ADMIN_CLIENT_RETRY_DURATION, ADMIN_CLIENT_RETRY_BACKOFF_MS);
                }
                return this.admin.endOffsets(assignment);
            }
            catch (UnsupportedVersionException e) {
                log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", (Object)e.getMessage());
                this.admin = null;
            }
        }
        return this.consumer.endOffsets(assignment);
    }

    private class WorkThread
    extends Thread {
        public WorkThread() {
            super("KafkaBasedLog Work Thread - " + KafkaBasedLog.this.topic);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                log.trace("{} started execution", (Object)this);
                while (true) {
                    int numCallbacks;
                    KafkaBasedLog kafkaBasedLog = KafkaBasedLog.this;
                    synchronized (kafkaBasedLog) {
                        if (KafkaBasedLog.this.stopRequested) {
                            break;
                        }
                        numCallbacks = KafkaBasedLog.this.readLogEndOffsetCallbacks.size();
                    }
                    if (numCallbacks > 0) {
                        try {
                            KafkaBasedLog.this.readToLogEnd(false);
                            log.trace("Finished read to end log for topic {}", (Object)KafkaBasedLog.this.topic);
                        }
                        catch (TimeoutException e) {
                            log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. This may occur when brokers are unavailable or unreachable. Reason: {}", (Object)KafkaBasedLog.this.topic, (Object)e.getMessage());
                            continue;
                        }
                        catch (org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.errors.RetriableException | RetriableException e) {
                            log.warn("Retriable error while reading log to end for topic '{}'. Retrying automatically. Reason: {}", (Object)KafkaBasedLog.this.topic, (Object)e.getMessage());
                            continue;
                        }
                        catch (WakeupException e) {
                            continue;
                        }
                    }
                    KafkaBasedLog e = KafkaBasedLog.this;
                    synchronized (e) {
                        for (int i = 0; i < numCallbacks; ++i) {
                            Callback cb = (Callback)KafkaBasedLog.this.readLogEndOffsetCallbacks.poll();
                            cb.onCompletion(null, null);
                        }
                    }
                    try {
                        KafkaBasedLog.this.poll(Integer.MAX_VALUE);
                    }
                    catch (WakeupException e2) {}
                }
            }
            catch (Throwable t) {
                log.error("Unexpected exception in {}", (Object)this, (Object)t);
            }
        }
    }
}

