/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaUnboundedSource<@UnknownKeyFor K, @UnknownKeyFor V>
extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class);
    private final  @UnknownKeyFor @NonNull @Initialized KafkaIO.Read<K, V> spec;
    private final @UnknownKeyFor @NonNull @Initialized int id;

    /*
     * WARNING - void declaration
     */
    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized KafkaUnboundedSource<K, V>> split(@UnknownKeyFor @NonNull @Initialized int desiredNumSplits, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized Exception {
        void var8_30;
        void var7_21;
        void var7_19;
        int numSplits;
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>((Collection)Preconditions.checkStateNotNull(this.spec.getTopicPartitions()));
        String bootStrapServers = (String)Preconditions.checkArgumentNotNull((Object)this.spec.getConsumerConfig().get("bootstrap.servers"));
        if (partitions.isEmpty()) {
            try (Consumer consumer = (Consumer)this.spec.getConsumerFactoryFn().apply(this.spec.getConsumerConfig());){
                List topics = (List)Preconditions.checkStateNotNull(this.spec.getTopics());
                if (topics.isEmpty()) {
                    Pattern pattern = (Pattern)Preconditions.checkStateNotNull((Object)this.spec.getTopicPattern());
                    for (Map.Entry entry : consumer.listTopics().entrySet()) {
                        if (!pattern.matcher((CharSequence)entry.getKey()).matches()) continue;
                        for (PartitionInfo p : (List)entry.getValue()) {
                            partitions.add(new TopicPartition(p.topic(), p.partition()));
                            Lineage.getSources().add("kafka", (Iterable)ImmutableList.of((Object)bootStrapServers, (Object)p.topic()));
                        }
                    }
                }
                Iterator iterator = topics.iterator();
                while (iterator.hasNext()) {
                    String string = (String)iterator.next();
                    List partitionInfoList = consumer.partitionsFor(string);
                    if (this.spec.getLogTopicVerification() == null || !this.spec.getLogTopicVerification().booleanValue()) {
                        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((partitionInfoList != null && !partitionInfoList.isEmpty() ? 1 : 0) != 0, (Object)"Could not find any partitions info. Please check Kafka configuration and make sure that provided topics exist.");
                    } else {
                        LOG.warn("Could not find any partitions info. Please check Kafka configuration and make sure that the provided topics exist.");
                    }
                    for (PartitionInfo p : partitionInfoList) {
                        partitions.add(new TopicPartition(p.topic(), p.partition()));
                    }
                    Lineage.getSources().add("kafka", (Iterable)ImmutableList.of((Object)bootStrapServers, (Object)string));
                }
            }
        } else {
            HashMap<String, List> topicsAndPartitions = new HashMap<String, List>();
            for (TopicPartition topicPartition : partitions) {
                topicsAndPartitions.computeIfAbsent(topicPartition.topic(), k -> new ArrayList()).add(topicPartition.partition());
            }
            try (Consumer consumer = (Consumer)this.spec.getConsumerFactoryFn().apply(this.spec.getConsumerConfig());){
                for (Map.Entry entry : topicsAndPartitions.entrySet()) {
                    String providedTopic = (String)entry.getKey();
                    List providedPartitions = (List)entry.getValue();
                    try {
                        Set partitionsForTopic = consumer.partitionsFor(providedTopic).stream().map(PartitionInfo::partition).collect(Collectors.toSet());
                        if (this.spec.getLogTopicVerification() == null || !this.spec.getLogTopicVerification().booleanValue()) {
                            for (Integer p : providedPartitions) {
                                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((boolean)partitionsForTopic.contains(p), (Object)("Partition " + p + " does not exist for topic " + providedTopic + ". Please check Kafka configuration."));
                            }
                        } else {
                            for (Integer p : providedPartitions) {
                                if (partitionsForTopic.contains(p)) continue;
                                LOG.warn("Partition {} does not exist for topic {}. Please check Kafka configuration.", (Object)p, (Object)providedTopic);
                            }
                        }
                    }
                    catch (KafkaException exception) {
                        LOG.warn("Unable to access cluster. Skipping fail fast checks.");
                    }
                    Lineage.getSources().add("kafka", (Iterable)ImmutableList.of((Object)bootStrapServers, (Object)providedTopic));
                }
            }
            catch (KafkaException exception) {
                LOG.warn("WARN: Failed to connect to kafka for running pre-submit validation of kafka topic and partition configuration. This may be due to local permissions or connectivity to the kafka bootstrap server, or due to misconfiguration of KafkaIO. This validation is not required, and this warning may be ignored if the Beam job runs successfully.");
            }
        }
        partitions.sort(Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition));
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((desiredNumSplits > 0 ? 1 : 0) != 0);
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((partitions.size() > 0 ? 1 : 0) != 0, (Object)"Could not find any partitions. Please check Kafka configuration and topic names");
        if (this.offsetBasedDeduplicationSupported()) {
            numSplits = partitions.size();
            LOG.info("Offset-based deduplication is enabled for KafkaUnboundedSource. Forcing the number of splits to equal the number of total partitions: {}.", (Object)numSplits);
        } else {
            numSplits = Math.min(desiredNumSplits, partitions.size());
            while (partitions.size() % numSplits > 0) {
                ++numSplits;
            }
        }
        ArrayList assignments = new ArrayList(numSplits);
        boolean bl = false;
        while (var7_19 < numSplits) {
            assignments.add(new ArrayList());
            ++var7_19;
        }
        boolean bl2 = false;
        while (var7_21 < partitions.size()) {
            ((List)assignments.get((int)(var7_21 % numSplits))).add((TopicPartition)partitions.get((int)var7_21));
            ++var7_21;
        }
        ArrayList<KafkaUnboundedSource<K, V>> arrayList = new ArrayList<KafkaUnboundedSource<K, V>>(numSplits);
        boolean bl3 = false;
        while (var8_30 < numSplits) {
            List assignedToSplit = (List)assignments.get((int)var8_30);
            LOG.info("Partitions assigned to split {} (total {}): {}", new Object[]{(int)var8_30, assignedToSplit.size(), Joiner.on((String)",").join((Iterable)assignedToSplit)});
            arrayList.add(new KafkaUnboundedSource<K, V>(this.spec.toBuilder().setTopics(Collections.emptyList()).setTopicPartitions(assignedToSplit).build(), (int)var8_30));
            ++var8_30;
        }
        return arrayList;
    }

    public @UnknownKeyFor @NonNull @Initialized KafkaUnboundedReader<K, V> createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @Nullable @UnknownKeyFor @Initialized KafkaCheckpointMark checkpointMark) {
        Preconditions.checkStateNotNull(this.spec.getTopicPartitions());
        if (this.spec.getTopicPartitions().isEmpty()) {
            LOG.warn("Looks like generateSplits() is not called. Generate single split.");
            try {
                return new KafkaUnboundedReader<K, V>(this.split(1, options).get(0), checkpointMark);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return new KafkaUnboundedReader(this, checkpointMark);
    }

    public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized KafkaCheckpointMark> getCheckpointMarkCoder() {
        return AvroCoder.of(KafkaCheckpointMark.class);
    }

    public @UnknownKeyFor @NonNull @Initialized boolean requiresDeduping() {
        return false;
    }

    public @UnknownKeyFor @NonNull @Initialized boolean offsetBasedDeduplicationSupported() {
        return this.spec.getOffsetDeduplication() != null && this.spec.getOffsetDeduplication() != false;
    }

    public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>> getOutputCoder() {
        Coder keyCoder = (Coder)Preconditions.checkStateNotNull(this.spec.getKeyCoder());
        Coder valueCoder = (Coder)Preconditions.checkStateNotNull(this.spec.getValueCoder());
        return KafkaRecordCoder.of(keyCoder, valueCoder);
    }

    public KafkaUnboundedSource( @UnknownKeyFor @NonNull @Initialized KafkaIO.Read<K, V> spec, @UnknownKeyFor @NonNull @Initialized int id) {
        this.spec = spec;
        this.id = id;
    }

     @UnknownKeyFor @NonNull @Initialized KafkaIO.Read<K, V> getSpec() {
        return this.spec;
    }

    @UnknownKeyFor @NonNull @Initialized int getId() {
        return this.id;
    }
}

