/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;

public class InteractiveQueryService {
    private static final Log LOG = LogFactory.getLog(InteractiveQueryService.class);
    private final KafkaStreamsRegistry kafkaStreamsRegistry;
    private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;

    public InteractiveQueryService(KafkaStreamsRegistry kafkaStreamsRegistry, KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
        this.kafkaStreamsRegistry = kafkaStreamsRegistry;
        this.binderConfigurationProperties = binderConfigurationProperties;
    }

    public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType) {
        RetryTemplate retryTemplate = new RetryTemplate();
        KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
        retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        retryTemplate.setRetryPolicy((RetryPolicy)retryPolicy);
        return (T)retryTemplate.execute(context -> {
            Object store = null;
            Set<KafkaStreams> kafkaStreams = this.kafkaStreamsRegistry.getKafkaStreams();
            Iterator<KafkaStreams> iterator = kafkaStreams.iterator();
            InvalidStateStoreException throwable = null;
            while (iterator.hasNext()) {
                try {
                    store = iterator.next().store(storeName, storeType);
                }
                catch (InvalidStateStoreException e) {
                    throwable = e;
                }
            }
            if (store != null) {
                return store;
            }
            throw new IllegalStateException("Error when retrieving state store: j " + storeName, throwable);
        });
    }

    public HostInfo getCurrentHostInfo() {
        Map configuration = this.binderConfigurationProperties.getConfiguration();
        if (configuration.containsKey("application.server")) {
            String applicationServer = (String)configuration.get("application.server");
            String[] splits = StringUtils.split((String)applicationServer, (String)":");
            return new HostInfo(splits[0], Integer.valueOf(splits[1]).intValue());
        }
        return null;
    }

    public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
        StreamsMetadata streamsMetadata = this.kafkaStreamsRegistry.getKafkaStreams().stream().map(k -> Optional.ofNullable(k.metadataForKey(store, key, serializer))).filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
        return streamsMetadata != null ? streamsMetadata.hostInfo() : null;
    }
}

