/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.EnvironmentCapable;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaAdminOperations;
import org.springframework.kafka.core.KafkaResourceFactory;
import org.springframework.kafka.support.TopicForRetryable;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

public class KafkaAdmin
extends KafkaResourceFactory
implements ApplicationContextAware,
SmartInitializingSingleton,
KafkaAdminOperations {
    public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(10L);
    private static final int DEFAULT_OPERATION_TIMEOUT = 30;
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaAdmin.class));
    private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger();
    private final Map<String, Object> configs;
    private ApplicationContext applicationContext;
    private Predicate<NewTopic> createOrModifyTopic = nt -> true;
    private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;
    private int operationTimeout = 30;
    private boolean fatalIfBrokerNotAvailable;
    private boolean autoCreate = true;
    private boolean initializingContext;
    private boolean modifyTopicConfigs;
    private String clusterId;

    public KafkaAdmin(Map<String, Object> config) {
        this.configs = new HashMap<String, Object>(config);
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setCloseTimeout(int closeTimeout) {
        this.closeTimeout = Duration.ofSeconds(closeTimeout);
    }

    public void setOperationTimeout(int operationTimeout) {
        this.operationTimeout = operationTimeout;
    }

    public int getOperationTimeout() {
        return this.operationTimeout;
    }

    public void setFatalIfBrokerNotAvailable(boolean fatalIfBrokerNotAvailable) {
        this.fatalIfBrokerNotAvailable = fatalIfBrokerNotAvailable;
    }

    public void setAutoCreate(boolean autoCreate) {
        this.autoCreate = autoCreate;
    }

    public void setModifyTopicConfigs(boolean modifyTopicConfigs) {
        this.modifyTopicConfigs = modifyTopicConfigs;
    }

    public void setCreateOrModifyTopic(Predicate<NewTopic> createOrModifyTopic) {
        Assert.notNull(createOrModifyTopic, (String)"'createOrModifyTopic' cannot be null");
        this.createOrModifyTopic = createOrModifyTopic;
    }

    protected Predicate<NewTopic> getCreateOrModifyTopic() {
        return this.createOrModifyTopic;
    }

    public void setClusterId(String clusterId) {
        this.clusterId = clusterId;
    }

    public String getClusterId() {
        return this.clusterId;
    }

    @Override
    public Map<String, Object> getConfigurationProperties() {
        HashMap<String, Object> configs2 = new HashMap<String, Object>(this.configs);
        this.checkBootstrap(configs2);
        return Collections.unmodifiableMap(configs2);
    }

    public void afterSingletonsInstantiated() {
        this.initializingContext = true;
        if (this.autoCreate) {
            this.initialize();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final boolean initialize() {
        Collection<NewTopic> newTopics = this.newTopics();
        if (newTopics.size() > 0) {
            AdminClient adminClient = null;
            try {
                adminClient = this.createAdmin();
            }
            catch (Exception e22) {
                if (!this.initializingContext || this.fatalIfBrokerNotAvailable) {
                    throw new IllegalStateException("Could not create admin", e22);
                }
                LOGGER.error((Throwable)e22, (CharSequence)"Could not create admin");
            }
            if (adminClient != null) {
                try {
                    KafkaAdmin e22 = this;
                    synchronized (e22) {
                        if (this.clusterId != null) {
                            this.clusterId = (String)adminClient.describeCluster().clusterId().get((long)this.operationTimeout, TimeUnit.SECONDS);
                        }
                    }
                    this.addOrModifyTopicsIfNeeded(adminClient, newTopics);
                    boolean e22 = true;
                    return e22;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
                catch (Exception ex) {
                    if (!this.initializingContext || this.fatalIfBrokerNotAvailable) {
                        throw new IllegalStateException("Could not configure topics", ex);
                    }
                    LOGGER.error((Throwable)ex, (CharSequence)"Could not configure topics");
                }
                finally {
                    this.initializingContext = false;
                    adminClient.close(this.closeTimeout);
                }
            }
        }
        this.initializingContext = false;
        return false;
    }

    protected Collection<NewTopic> newTopics() {
        HashMap newTopicsMap = new HashMap(this.applicationContext.getBeansOfType(NewTopic.class, false, false));
        Map wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false);
        AtomicInteger count = new AtomicInteger();
        wrappers.forEach((name, newTopics) -> newTopics.getNewTopics().forEach(nt -> newTopicsMap.put(name + "#" + count.getAndIncrement(), nt)));
        Map<String, NewTopic> topicsForRetry = newTopicsMap.entrySet().stream().filter(entry -> entry.getValue() instanceof TopicForRetryable).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        for (Map.Entry<String, NewTopic> entry2 : topicsForRetry.entrySet()) {
            Iterator iterator = newTopicsMap.entrySet().iterator();
            boolean remove = false;
            while (iterator.hasNext()) {
                Map.Entry nt = iterator.next();
                if (!((NewTopic)nt.getValue()).name().equals(entry2.getValue().name()) || nt.getValue() instanceof TopicForRetryable) continue;
                remove = true;
                break;
            }
            if (!remove) continue;
            newTopicsMap.remove(entry2.getKey());
        }
        Iterator iterator = newTopicsMap.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry next = iterator.next();
            if (this.createOrModifyTopic.test((NewTopic)next.getValue())) continue;
            iterator.remove();
        }
        return new ArrayList<NewTopic>(newTopicsMap.values());
    }

    @Override
    @Nullable
    public String clusterId() {
        if (this.clusterId == null) {
            try (AdminClient client = this.createAdmin();){
                this.clusterId = (String)client.describeCluster().clusterId().get((long)this.operationTimeout, TimeUnit.SECONDS);
                if (this.clusterId == null) {
                    this.clusterId = "null";
                }
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
            catch (Exception ex) {
                LOGGER.error((Throwable)ex, (CharSequence)"Could not obtain cluster info");
            }
        }
        return this.clusterId;
    }

    @Override
    public void createOrModifyTopics(NewTopic ... topics) {
        try (AdminClient client = this.createAdmin();){
            this.addOrModifyTopicsIfNeeded(client, Arrays.asList(topics));
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public Map<String, TopicDescription> describeTopics(String ... topicNames) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [4[CATCHBLOCK]], but top level block is 2[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    AdminClient createAdmin() {
        return AdminClient.create(this.getAdminConfig());
    }

    protected Map<String, Object> getAdminConfig() {
        HashMap<String, Object> configs2 = new HashMap<String, Object>(this.configs);
        this.checkBootstrap(configs2);
        if (!configs2.containsKey("client.id")) {
            Optional.ofNullable(this.applicationContext).map(EnvironmentCapable::getEnvironment).map(environment -> environment.getProperty("spring.application.name")).ifPresent(applicationName -> configs2.put("client.id", applicationName + "-admin-" + CLIENT_ID_COUNTER.getAndIncrement()));
        }
        return configs2;
    }

    private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
        if (topics.size() > 0) {
            HashMap<String, NewTopic> topicNameToTopic = new HashMap<String, NewTopic>();
            topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
            DescribeTopicsResult topicInfo = adminClient.describeTopics((Collection)topics.stream().map(NewTopic::name).collect(Collectors.toList()));
            ArrayList<NewTopic> topicsToAdd = new ArrayList<NewTopic>();
            Map<String, NewPartitions> topicsWithPartitionMismatches = this.checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
            if (topicsToAdd.size() > 0) {
                this.addTopics(adminClient, topicsToAdd);
            }
            if (topicsWithPartitionMismatches.size() > 0) {
                this.createMissingPartitions(adminClient, topicsWithPartitionMismatches);
            }
            if (this.modifyTopicConfigs) {
                LinkedList<NewTopic> toCheck = new LinkedList<NewTopic>(topics);
                toCheck.removeAll(topicsToAdd);
                Map<ConfigResource, List<ConfigEntry>> mismatchingConfigs = this.checkTopicsForConfigMismatches(adminClient, toCheck);
                if (!mismatchingConfigs.isEmpty()) {
                    this.adjustConfigMismatches(adminClient, topics, mismatchingConfigs);
                }
            }
        }
    }

    private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(AdminClient adminClient, Collection<NewTopic> topics) {
        List configResources = topics.stream().map(topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name())).collect(Collectors.toList());
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(configResources);
        try {
            Map topicsConfig = (Map)describeConfigsResult.all().get((long)this.operationTimeout, TimeUnit.SECONDS);
            HashMap<ConfigResource, List<ConfigEntry>> configMismatches = new HashMap<ConfigResource, List<ConfigEntry>>();
            for (Map.Entry topicConfig : topicsConfig.entrySet()) {
                Optional<NewTopic> topicOptional = topics.stream().filter(p -> p.name().equals(((ConfigResource)topicConfig.getKey()).name())).findFirst();
                ArrayList<ConfigEntry> configMismatchesEntries = new ArrayList<ConfigEntry>();
                if (!topicOptional.isPresent() || topicOptional.get().configs() == null) continue;
                for (Map.Entry desiredConfigParameter : topicOptional.get().configs().entrySet()) {
                    ConfigEntry actualConfigParameter = ((Config)topicConfig.getValue()).get((String)desiredConfigParameter.getKey());
                    if (actualConfigParameter == null) {
                        throw new IllegalStateException("Topic property '" + (String)desiredConfigParameter.getKey() + "' does not exist");
                    }
                    if (((String)desiredConfigParameter.getValue()).equals(actualConfigParameter.value())) continue;
                    configMismatchesEntries.add(actualConfigParameter);
                }
                if (configMismatchesEntries.size() <= 0) continue;
                configMismatches.put((ConfigResource)topicConfig.getKey(), configMismatchesEntries);
            }
            return configMismatches;
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new KafkaException("Interrupted while getting topic descriptions:" + topics, ie);
        }
        catch (ExecutionException | TimeoutException ex) {
            throw new KafkaException("Failed to obtain topic descriptions:" + topics, ex);
        }
    }

    private void adjustConfigMismatches(AdminClient adminClient, Collection<NewTopic> topics, Map<ConfigResource, List<ConfigEntry>> mismatchingConfigs) {
        for (Map.Entry<ConfigResource, List<ConfigEntry>> mismatchingConfigsOfTopic : mismatchingConfigs.entrySet()) {
            ConfigResource topicConfigResource = mismatchingConfigsOfTopic.getKey();
            Optional<NewTopic> topicOptional = topics.stream().filter(p -> p.name().equals(topicConfigResource.name())).findFirst();
            if (!topicOptional.isPresent()) continue;
            for (ConfigEntry mismatchConfigEntry : mismatchingConfigsOfTopic.getValue()) {
                ArrayList<AlterConfigOp> alterConfigOperations = new ArrayList<AlterConfigOp>();
                Map desiredConfigs = topicOptional.get().configs();
                if (desiredConfigs.get(mismatchConfigEntry.name()) != null) {
                    alterConfigOperations.add(new AlterConfigOp(new ConfigEntry(mismatchConfigEntry.name(), (String)desiredConfigs.get(mismatchConfigEntry.name())), AlterConfigOp.OpType.SET));
                }
                if (alterConfigOperations.size() <= 0) continue;
                try {
                    AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(Map.of(topicConfigResource, alterConfigOperations));
                    alterConfigsResult.all().get((long)this.operationTimeout, TimeUnit.SECONDS);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new KafkaException("Interrupted while getting topic descriptions", ie);
                }
                catch (ExecutionException | TimeoutException ex) {
                    throw new KafkaException("Failed to obtain topic descriptions", ex);
                }
            }
        }
    }

    private Map<String, NewPartitions> checkPartitions(Map<String, NewTopic> topicNameToTopic, DescribeTopicsResult topicInfo, List<NewTopic> topicsToAdd) {
        HashMap<String, NewPartitions> topicsToModify = new HashMap<String, NewPartitions>();
        topicInfo.topicNameValues().forEach((n, f) -> {
            NewTopic topic = (NewTopic)topicNameToTopic.get(n);
            try {
                TopicDescription topicDescription = (TopicDescription)f.get((long)this.operationTimeout, TimeUnit.SECONDS);
                if (topic.numPartitions() >= 0 && topic.numPartitions() < topicDescription.partitions().size()) {
                    LOGGER.info(() -> String.format("Topic '%s' exists but has a different partition count: %d not %d", n, topicDescription.partitions().size(), topic.numPartitions()));
                } else if (topic.numPartitions() > topicDescription.partitions().size()) {
                    LOGGER.info(() -> String.format("Topic '%s' exists but has a different partition count: %d not %d, increasing if the broker supports it", n, topicDescription.partitions().size(), topic.numPartitions()));
                    topicsToModify.put((String)n, NewPartitions.increaseTo((int)topic.numPartitions()));
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (TimeoutException e) {
                throw new KafkaException("Timed out waiting to get existing topics", e);
            }
            catch (ExecutionException e) {
                topicsToAdd.add(topic);
            }
        });
        return topicsToModify;
    }

    private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
        CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd);
        try {
            topicResults.all().get((long)this.operationTimeout, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.error((Throwable)e, (CharSequence)"Interrupted while waiting for topic creation results");
        }
        catch (TimeoutException e) {
            throw new KafkaException("Timed out waiting for create topics results", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                LOGGER.debug(e.getCause(), (CharSequence)"Failed to create topics");
            }
            LOGGER.error(e.getCause(), (CharSequence)"Failed to create topics");
            throw new KafkaException("Failed to create topics", e.getCause());
        }
    }

    private void createMissingPartitions(AdminClient adminClient, Map<String, NewPartitions> topicsToModify) {
        block5: {
            CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify);
            try {
                partitionsResult.all().get((long)this.operationTimeout, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.error((Throwable)e, (CharSequence)"Interrupted while waiting for partition creation results");
            }
            catch (TimeoutException e) {
                throw new KafkaException("Timed out waiting for create partitions results", e);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof InvalidPartitionsException) {
                    LOGGER.debug(e.getCause(), (CharSequence)"Failed to create partitions");
                }
                LOGGER.error(e.getCause(), (CharSequence)"Failed to create partitions");
                if (e.getCause() instanceof UnsupportedVersionException) break block5;
                throw new KafkaException("Failed to create partitions", e.getCause());
            }
        }
    }

    public static class NewTopics {
        private final Collection<NewTopic> newTopics = new ArrayList<NewTopic>();

        public NewTopics(NewTopic ... newTopics) {
            this.newTopics.addAll(Arrays.asList(newTopics));
        }

        Collection<NewTopic> getNewTopics() {
            return this.newTopics;
        }
    }
}

