/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamsKafkaClientTest {
    private static final String TOPIC = "topic";
    private final MockClient kafkaClient = new MockClient((Time)new MockTime());
    private final List<MetricsReporter> reporters = Collections.emptyList();
    private final MetadataResponse metadata = new MetadataResponse(Collections.singletonList(new Node(1, "host", 90)), "cluster", 1, Collections.emptyList());
    private final Map<String, Object> config = new HashMap<String, Object>();
    private final InternalTopicConfig topicConfigWithNoOverrides = new InternalTopicConfig("topic", Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.emptyMap());
    private final Map<String, String> overridenTopicConfig = Collections.singletonMap("delete.retention.ms", "100");
    private final InternalTopicConfig topicConfigWithOverrides = new InternalTopicConfig("topic", Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), this.overridenTopicConfig);

    @Before
    public void before() {
        this.config.put("application.id", "some_app_id");
        this.config.put("bootstrap.servers", "localhost:9000");
    }

    @Test
    public void testConfigFromStreamsConfig() {
        for (String expectedMechanism : Arrays.asList("PLAIN", "SCRAM-SHA-512")) {
            this.config.put("sasl.mechanism", expectedMechanism);
            StreamsConfig streamsConfig = new StreamsConfig(this.config);
            StreamsKafkaClient.Config config = StreamsKafkaClient.Config.fromStreamsConfig((StreamsConfig)streamsConfig);
            Assert.assertEquals((Object)expectedMechanism, config.values().get("sasl.mechanism"));
            Assert.assertEquals((Object)expectedMechanism, (Object)config.getString("sasl.mechanism"));
        }
    }

    @Test
    public void shouldAddCleanupPolicyToTopicConfigWhenCreatingTopic() throws Exception {
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        this.verifyCorrectTopicConfigs(streamsKafkaClient, this.topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy", "delete"));
    }

    @Test
    public void shouldAddDefaultTopicConfigFromStreamConfig() throws Exception {
        this.config.put(StreamsConfig.topicPrefix((String)"segment.ms"), "100");
        this.config.put(StreamsConfig.topicPrefix((String)"compression.type"), "gzip");
        HashMap<String, String> expectedConfigs = new HashMap<String, String>();
        expectedConfigs.put("segment.ms", "100");
        expectedConfigs.put("compression.type", "gzip");
        expectedConfigs.put("cleanup.policy", "delete");
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        this.verifyCorrectTopicConfigs(streamsKafkaClient, this.topicConfigWithNoOverrides, expectedConfigs);
    }

    @Test
    public void shouldSetPropertiesDefinedByInternalTopicConfig() throws Exception {
        HashMap<String, String> expectedConfigs = new HashMap<String, String>(this.overridenTopicConfig);
        expectedConfigs.put("cleanup.policy", "compact");
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        this.verifyCorrectTopicConfigs(streamsKafkaClient, this.topicConfigWithOverrides, expectedConfigs);
    }

    @Test
    public void shouldOverrideDefaultTopicConfigsFromStreamsConfig() throws Exception {
        this.config.put(StreamsConfig.topicPrefix((String)"delete.retention.ms"), "99999");
        this.config.put(StreamsConfig.topicPrefix((String)"segment.ms"), "988");
        HashMap<String, String> expectedConfigs = new HashMap<String, String>(this.overridenTopicConfig);
        expectedConfigs.put("cleanup.policy", "compact");
        expectedConfigs.put("delete.retention.ms", "100");
        expectedConfigs.put("segment.ms", "988");
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        this.verifyCorrectTopicConfigs(streamsKafkaClient, this.topicConfigWithOverrides, expectedConfigs);
    }

    @Test
    public void shouldNotAllowNullTopicConfigs() throws Exception {
        this.config.put(StreamsConfig.topicPrefix((String)"delete.retention.ms"), null);
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        this.verifyCorrectTopicConfigs(streamsKafkaClient, this.topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy", "delete"));
    }

    @Test
    public void metricsShouldBeTaggedWithClientId() {
        this.config.put("client.id", "some_client_id");
        this.config.put("metric.reporters", TestMetricsReporter.class.getName());
        StreamsKafkaClient.create((StreamsConfig)new StreamsConfig(this.config));
        Assert.assertFalse((boolean)TestMetricsReporter.METRICS.isEmpty());
        for (KafkaMetric kafkaMetric : TestMetricsReporter.METRICS.values()) {
            Assert.assertEquals((Object)"some_client_id", kafkaMetric.metricName().tags().get("client-id"));
        }
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionOnEmptyBrokerCompatibilityResponse() {
        this.kafkaClient.prepareResponse(null);
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        streamsKafkaClient.checkBrokerCompatibility(false);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionWhenBrokerCompatibilityResponseInconsistent() {
        this.kafkaClient.prepareResponse((AbstractResponse)new ProduceResponse(Collections.emptyMap()));
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        streamsKafkaClient.checkBrokerCompatibility(false);
    }

    @Test(expected=StreamsException.class)
    public void shouldRequireBrokerVersion0101OrHigherWhenEosDisabled() {
        this.kafkaClient.prepareResponse((AbstractResponse)new ApiVersionsResponse(Errors.NONE, Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE))));
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        streamsKafkaClient.checkBrokerCompatibility(false);
    }

    @Test(expected=StreamsException.class)
    public void shouldRequireBrokerVersions0110OrHigherWhenEosEnabled() {
        this.kafkaClient.prepareResponse((AbstractResponse)new ApiVersionsResponse(Errors.NONE, Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.CREATE_TOPICS))));
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        streamsKafkaClient.checkBrokerCompatibility(true);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionOnEmptyFetchMetadataResponse() {
        this.kafkaClient.prepareResponse(null);
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        streamsKafkaClient.fetchMetadata();
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionWhenFetchMetadataResponseInconsistent() {
        this.kafkaClient.prepareResponse((AbstractResponse)new ProduceResponse(Collections.emptyMap()));
        StreamsKafkaClient streamsKafkaClient = this.createStreamsKafkaClient();
        streamsKafkaClient.fetchMetadata();
    }

    private void verifyCorrectTopicConfigs(StreamsKafkaClient streamsKafkaClient, InternalTopicConfig internalTopicConfig, Map<String, String> expectedConfigs) {
        final HashMap requestedTopicConfigs = new HashMap();
        this.kafkaClient.prepareResponse(new MockClient.RequestMatcher(){

            public boolean matches(AbstractRequest body) {
                if (!(body instanceof CreateTopicsRequest)) {
                    return false;
                }
                CreateTopicsRequest request = (CreateTopicsRequest)body;
                Map topics = request.topics();
                CreateTopicsRequest.TopicDetails topicDetails = (CreateTopicsRequest.TopicDetails)topics.get(StreamsKafkaClientTest.TOPIC);
                requestedTopicConfigs.putAll(topicDetails.configs);
                return true;
            }
        }, (AbstractResponse)new CreateTopicsResponse(Collections.singletonMap(TOPIC, ApiError.NONE)));
        streamsKafkaClient.createTopics(Collections.singletonMap(internalTopicConfig, 1), 1, 1L, this.metadata);
        Assert.assertThat(requestedTopicConfigs, (Matcher)CoreMatchers.equalTo(expectedConfigs));
    }

    private StreamsKafkaClient createStreamsKafkaClient() {
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        return new StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig((StreamsConfig)streamsConfig), (KafkaClient)this.kafkaClient, this.reporters);
    }

    public static class TestMetricsReporter
    implements MetricsReporter {
        static final Map<MetricName, KafkaMetric> METRICS = new HashMap<MetricName, KafkaMetric>();

        public void configure(Map<String, ?> configs) {
        }

        public void init(List<KafkaMetric> metrics) {
            for (KafkaMetric metric : metrics) {
                this.metricChange(metric);
            }
        }

        public void metricChange(KafkaMetric metric) {
            METRICS.put(metric.metricName(), metric);
        }

        public void metricRemoval(KafkaMetric metric) {
            METRICS.remove(metric.metricName());
        }

        public void close() {
            METRICS.clear();
        }
    }
}

