/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.slf4j.Logger;

public class RepartitionTopics {
    private final InternalTopicManager internalTopicManager;
    private final TopologyMetadata topologyMetadata;
    private final Cluster clusterMetadata;
    private final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
    private final Logger log;
    private final Map<TopicPartition, PartitionInfo> topicPartitionInfos = new HashMap<TopicPartition, PartitionInfo>();
    private final Map<TopologyMetadata.Subtopology, Set<String>> missingInputTopicsBySubtopology = new HashMap<TopologyMetadata.Subtopology, Set<String>>();

    public RepartitionTopics(TopologyMetadata topologyMetadata, InternalTopicManager internalTopicManager, CopartitionedTopicsEnforcer copartitionedTopicsEnforcer, Cluster clusterMetadata, String logPrefix) {
        this.topologyMetadata = topologyMetadata;
        this.internalTopicManager = internalTopicManager;
        this.clusterMetadata = clusterMetadata;
        this.copartitionedTopicsEnforcer = copartitionedTopicsEnforcer;
        LogContext logContext = new LogContext(logPrefix);
        this.log = logContext.logger(this.getClass());
    }

    public void setup() {
        Map<String, InternalTopicConfig> repartitionTopicMetadata = this.computeRepartitionTopicConfig(this.clusterMetadata);
        if (repartitionTopicMetadata.isEmpty()) {
            if (this.missingInputTopicsBySubtopology.isEmpty()) {
                this.log.info("Skipping the repartition topic validation since there are no repartition topics.");
            } else {
                this.log.info("Skipping the repartition topic validation since all topologies containing repartitiontopics are missing external user source topics and cannot be processed.");
            }
        } else {
            this.ensureCopartitioning(this.topologyMetadata.copartitionGroups(), repartitionTopicMetadata, this.clusterMetadata);
            this.internalTopicManager.makeReady(repartitionTopicMetadata);
            for (Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet()) {
                String topic = entry.getKey();
                int numPartitions = entry.getValue().numberOfPartitions().orElse(-1);
                for (int partition = 0; partition < numPartitions; ++partition) {
                    this.topicPartitionInfos.put(new TopicPartition(topic, partition), new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
                }
            }
        }
    }

    public Set<String> topologiesWithMissingInputTopics() {
        return this.missingInputTopicsBySubtopology.keySet().stream().map(s -> TopologyMetadata.getTopologyNameOrElseUnnamed(s.namedTopology)).collect(Collectors.toSet());
    }

    public Queue<StreamsException> missingSourceTopicExceptions() {
        return this.missingInputTopicsBySubtopology.entrySet().stream().map(entry -> {
            Set missingSourceTopics = (Set)entry.getValue();
            int subtopologyId = ((TopologyMetadata.Subtopology)entry.getKey()).nodeGroupId;
            String topologyName = ((TopologyMetadata.Subtopology)entry.getKey()).namedTopology;
            return new StreamsException((Throwable)((Object)new MissingSourceTopicException(String.format("Missing source topics %s for subtopology %d of topology %s", missingSourceTopics, subtopologyId, topologyName))), new TaskId(subtopologyId, 0, topologyName));
        }).collect(Collectors.toCollection(LinkedList::new));
    }

    public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
        return Collections.unmodifiableMap(this.topicPartitionInfos);
    }

    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(Cluster clusterMetadata) {
        HashSet<InternalTopologyBuilder.TopicsInfo> allTopicsInfo = new HashSet<InternalTopologyBuilder.TopicsInfo>();
        HashMap<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<String, InternalTopicConfig>();
        for (Map.Entry<String, Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo>> topologyEntry : this.topologyMetadata.topologyToSubtopologyTopicsInfoMap().entrySet()) {
            String topologyName = this.topologyMetadata.hasNamedTopologies() ? topologyEntry.getKey() : null;
            HashSet<InternalTopologyBuilder.TopicsInfo> topicsInfoForTopology = new HashSet<InternalTopologyBuilder.TopicsInfo>();
            HashSet<String> missingSourceTopicsForTopology = new HashSet<String>();
            HashMap<String, InternalTopicConfig> repartitionTopicConfigsForTopology = new HashMap<String, InternalTopicConfig>();
            for (Map.Entry<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyEntry : topologyEntry.getValue().entrySet()) {
                InternalTopologyBuilder.TopicsInfo topicsInfo = subtopologyEntry.getValue();
                topicsInfoForTopology.add(topicsInfo);
                repartitionTopicConfigsForTopology.putAll(topicsInfo.repartitionSourceTopics.values().stream().collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig)));
                Set<String> missingSourceTopicsForSubtopology = this.computeMissingExternalSourceTopics(topicsInfo, clusterMetadata);
                missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
                if (missingSourceTopicsForSubtopology.isEmpty()) continue;
                TopologyMetadata.Subtopology subtopology = subtopologyEntry.getKey();
                this.missingInputTopicsBySubtopology.put(subtopology, missingSourceTopicsForSubtopology);
                this.log.error("Subtopology {} has missing source topics {} and will be excluded from the current assignment, this can be due to the consumer client's metadata being stale or because they have not been created yet. Please verify that you have created all input topics; if they do exist, you just need to wait for the metadata to be updated, at which time a new rebalance will be kicked off automatically and the topology will be retried at that time.", (Object)subtopology.nodeGroupId, missingSourceTopicsForSubtopology);
            }
            if (missingSourceTopicsForTopology.isEmpty()) {
                allRepartitionTopicConfigs.putAll(repartitionTopicConfigsForTopology);
                allTopicsInfo.addAll(topicsInfoForTopology);
                continue;
            }
            this.log.debug("Skipping repartition topic validation for entire topology {} due to missing source topics {}", (Object)topologyName, missingSourceTopicsForTopology);
        }
        this.setRepartitionSourceTopicPartitionCount(allRepartitionTopicConfigs, allTopicsInfo, clusterMetadata);
        return allRepartitionTopicConfigs;
    }

    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<String, InternalTopicConfig> repartitionTopicMetadata, Cluster clusterMetadata) {
        for (Set<String> copartitionGroup : copartitionGroups) {
            this.copartitionedTopicsEnforcer.enforce(copartitionGroup, repartitionTopicMetadata, clusterMetadata);
        }
    }

    private Set<String> computeMissingExternalSourceTopics(InternalTopologyBuilder.TopicsInfo topicsInfo, Cluster clusterMetadata) {
        HashSet<String> missingExternalSourceTopics = new HashSet<String>(topicsInfo.sourceTopics);
        missingExternalSourceTopics.removeAll(topicsInfo.repartitionSourceTopics.keySet());
        missingExternalSourceTopics.removeAll(clusterMetadata.topics());
        return missingExternalSourceTopics;
    }

    private void setRepartitionSourceTopicPartitionCount(Map<String, InternalTopicConfig> repartitionTopicMetadata, Collection<InternalTopologyBuilder.TopicsInfo> topicGroups, Cluster clusterMetadata) {
        boolean partitionCountNeeded;
        do {
            partitionCountNeeded = false;
            boolean progressMadeThisIteration = false;
            for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups) {
                for (String repartitionSourceTopic : topicsInfo.repartitionSourceTopics.keySet()) {
                    Optional<Integer> repartitionSourceTopicPartitionCount = repartitionTopicMetadata.get(repartitionSourceTopic).numberOfPartitions();
                    if (repartitionSourceTopicPartitionCount.isPresent()) continue;
                    Integer numPartitions = this.computePartitionCount(repartitionTopicMetadata, topicGroups, clusterMetadata, repartitionSourceTopic);
                    if (numPartitions == null) {
                        partitionCountNeeded = true;
                        this.log.trace("Unable to determine number of partitions for {}, another iteration is needed", (Object)repartitionSourceTopic);
                        continue;
                    }
                    this.log.trace("Determined number of partitions for {} to be {}", (Object)repartitionSourceTopic, (Object)numPartitions);
                    repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
                    progressMadeThisIteration = true;
                }
            }
            if (progressMadeThisIteration || !partitionCountNeeded) continue;
            this.log.error("Unable to determine the number of partitions of all repartition topics, most likely a source topic is missing or pattern doesn't match any topics\ntopic groups: {}\ncluster topics: {}.", topicGroups, (Object)clusterMetadata.topics());
            throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics, make sure all user input topics are created and all Pattern subscriptions match at least one topic in the cluster");
        } while (partitionCountNeeded);
    }

    private Integer computePartitionCount(Map<String, InternalTopicConfig> repartitionTopicMetadata, Collection<InternalTopologyBuilder.TopicsInfo> topicGroups, Cluster clusterMetadata, String repartitionSourceTopic) {
        Integer partitionCount = null;
        for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups) {
            Set<String> sinkTopics = topicsInfo.sinkTopics;
            if (!sinkTopics.contains(repartitionSourceTopic)) continue;
            for (String upstreamSourceTopic : topicsInfo.sourceTopics) {
                Integer numPartitionsCandidate = null;
                if (repartitionTopicMetadata.containsKey(upstreamSourceTopic)) {
                    if (repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent()) {
                        numPartitionsCandidate = repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get();
                    }
                } else {
                    Integer count = clusterMetadata.partitionCountForTopic(upstreamSourceTopic);
                    if (count == null) {
                        throw new TaskAssignmentException("No partition count found for source topic " + upstreamSourceTopic + ", but it should have been.");
                    }
                    numPartitionsCandidate = count;
                }
                if (numPartitionsCandidate == null || partitionCount != null && numPartitionsCandidate <= partitionCount) continue;
                partitionCount = numPartitionsCandidate;
            }
        }
        return partitionCount;
    }
}

