/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;

public class InteractiveQueryService {
    private static final Log LOG = LogFactory.getLog(InteractiveQueryService.class);
    private final KafkaStreamsRegistry kafkaStreamsRegistry;
    private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;

    public InteractiveQueryService(KafkaStreamsRegistry kafkaStreamsRegistry, KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
        this.kafkaStreamsRegistry = kafkaStreamsRegistry;
        this.binderConfigurationProperties = binderConfigurationProperties;
    }

    public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType) {
        RetryTemplate retryTemplate = new RetryTemplate();
        KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
        retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        retryTemplate.setRetryPolicy((RetryPolicy)retryPolicy);
        KafkaStreams contextSpecificKafkaStreams = this.getThreadContextSpecificKafkaStreams();
        return (T)retryTemplate.execute(context -> {
            Object store = null;
            InvalidStateStoreException throwable = null;
            if (contextSpecificKafkaStreams != null) {
                try {
                    store = contextSpecificKafkaStreams.store(StoreQueryParameters.fromNameAndType((String)storeName, (QueryableStoreType)storeType));
                }
                catch (InvalidStateStoreException e) {
                    throwable = e;
                }
            }
            if (store != null) {
                return store;
            }
            if (contextSpecificKafkaStreams != null) {
                LOG.warn((Object)("Store " + storeName + " could not be found in Streams context, falling back to all known Streams instances"));
            }
            Set<KafkaStreams> kafkaStreams = this.kafkaStreamsRegistry.getKafkaStreams();
            Iterator<KafkaStreams> iterator = kafkaStreams.iterator();
            while (iterator.hasNext()) {
                try {
                    store = iterator.next().store(StoreQueryParameters.fromNameAndType((String)storeName, (QueryableStoreType)storeType));
                }
                catch (InvalidStateStoreException e) {
                    throwable = e;
                }
            }
            if (store != null) {
                return store;
            }
            throw new IllegalStateException("Error when retrieving state store: " + storeName, throwable);
        });
    }

    private KafkaStreams getThreadContextSpecificKafkaStreams() {
        return this.kafkaStreamsRegistry.getKafkaStreams().stream().filter(this::filterByThreadName).findAny().orElse(null);
    }

    private boolean filterByThreadName(KafkaStreams streams) {
        String applicationId = this.kafkaStreamsRegistry.streamBuilderFactoryBean(streams).getStreamsConfiguration().getProperty("application.id");
        return Thread.currentThread().getName().contains(applicationId);
    }

    public HostInfo getCurrentHostInfo() {
        Map configuration = this.binderConfigurationProperties.getConfiguration();
        if (configuration.containsKey("application.server")) {
            String applicationServer = (String)configuration.get("application.server");
            String[] splits = StringUtils.split((String)applicationServer, (String)":");
            return new HostInfo(splits[0], Integer.valueOf(splits[1]).intValue());
        }
        return null;
    }

    public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
        KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams().stream().map(k -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer))).filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
        return keyQueryMetadata != null ? keyQueryMetadata.getActiveHost() : null;
    }

    public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer) {
        return this.kafkaStreamsRegistry.getKafkaStreams().stream().map(k -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer))).filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
    }

    public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer) {
        AtomicReference kafkaStreamsAtomicReference = new AtomicReference();
        this.kafkaStreamsRegistry.getKafkaStreams().forEach(k -> {
            KeyQueryMetadata keyQueryMetadata = k.queryMetadataForKey(store, key, serializer);
            if (keyQueryMetadata != null) {
                kafkaStreamsAtomicReference.set(k);
            }
        });
        return (KafkaStreams)kafkaStreamsAtomicReference.get();
    }

    public List<HostInfo> getAllHostsInfo(String store) {
        return this.kafkaStreamsRegistry.getKafkaStreams().stream().flatMap(k -> k.allMetadataForStore(store).stream()).filter(Objects::nonNull).map(StreamsMetadata::hostInfo).collect(Collectors.toList());
    }
}

