/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;

public class StreamsKafkaClient {
    private static final ConfigDef CONFIG = StreamsConfig.configDef().withClientSslSupport().withClientSaslSupport();
    private final KafkaClient kafkaClient;
    private final List<MetricsReporter> reporters;
    private final Config streamsConfig;
    private static final int MAX_INFLIGHT_REQUESTS = 100;

    public StreamsKafkaClient(StreamsConfig streamsConfig) {
        this(Config.fromStreamsConfig(streamsConfig));
    }

    public StreamsKafkaClient(Config streamsConfig) {
        this.streamsConfig = streamsConfig;
        SystemTime time = new SystemTime();
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("client-id", "client.id");
        Metadata metadata = new Metadata(streamsConfig.getLong("retry.backoff.ms").longValue(), streamsConfig.getLong("metadata.max.age.ms").longValue());
        List addresses = ClientUtils.parseAndValidateAddresses((List)streamsConfig.getList("bootstrap.servers"));
        metadata.update(Cluster.bootstrap((List)addresses), time.milliseconds());
        MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt("metrics.num.samples").intValue()).timeWindow(streamsConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS).tags(metricTags);
        this.reporters = streamsConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        this.reporters.add((MetricsReporter)new JmxReporter("kafka.admin"));
        Metrics metrics = new Metrics(metricConfig, this.reporters, (Time)time);
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((Map)streamsConfig.values());
        Selector selector = new Selector(streamsConfig.getLong("connections.max.idle.ms").longValue(), metrics, (Time)time, "kafka-client", channelBuilder);
        this.kafkaClient = new NetworkClient((Selectable)selector, metadata, streamsConfig.getString("client.id"), 100, streamsConfig.getLong("reconnect.backoff.ms").longValue(), streamsConfig.getInt("send.buffer.bytes").intValue(), streamsConfig.getInt("receive.buffer.bytes").intValue(), streamsConfig.getInt("request.timeout.ms").intValue(), (Time)time, true);
    }

    public void close() throws IOException {
        for (MetricsReporter metricsReporter : this.reporters) {
            metricsReporter.close();
        }
    }

    public void createTopics(Map<InternalTopicConfig, Integer> topicsMap, int replicationFactor, long windowChangeLogAdditionalRetention, MetadataResponse metadata) {
        HashMap<String, CreateTopicsRequest.TopicDetails> topicRequestDetails = new HashMap<String, CreateTopicsRequest.TopicDetails>();
        for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
            Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            for (String key : topicProperties.stringPropertyNames()) {
                topicConfig.put(key, topicProperties.getProperty(key));
            }
            CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(topicsMap.get(internalTopicConfig).intValue(), (short)replicationFactor, topicConfig);
            topicRequestDetails.put(internalTopicConfig.name(), topicDetails);
        }
        ClientRequest clientRequest = this.kafkaClient.newClientRequest(this.getControllerReadyBrokerId(metadata), (AbstractRequest.Builder)new CreateTopicsRequest.Builder(topicRequestDetails, this.streamsConfig.getInt("request.timeout.ms").intValue()), Time.SYSTEM.milliseconds(), true);
        ClientResponse clientResponse = this.sendRequest(clientRequest);
        if (!clientResponse.hasResponse()) {
            throw new StreamsException("Empty response for client request.");
        }
        if (!(clientResponse.responseBody() instanceof CreateTopicsResponse)) {
            throw new StreamsException("Inconsistent response type for internal topic creation request. Expected CreateTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
        }
        CreateTopicsResponse createTopicsResponse = (CreateTopicsResponse)clientResponse.responseBody();
        for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
            CreateTopicsResponse.Error error = (CreateTopicsResponse.Error)createTopicsResponse.errors().get(internalTopicConfig.name());
            if (error.is(Errors.NONE) || error.is(Errors.TOPIC_ALREADY_EXISTS)) continue;
            throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.messageWithFallback());
        }
    }

    private String ensureOneNodeIsReady(List<Node> nodes) {
        String brokerId = null;
        long readyTimeout = Time.SYSTEM.milliseconds() + (long)this.streamsConfig.getInt("request.timeout.ms").intValue();
        boolean foundNode = false;
        while (!foundNode && Time.SYSTEM.milliseconds() < readyTimeout) {
            for (Node node : nodes) {
                if (!this.kafkaClient.ready(node, Time.SYSTEM.milliseconds())) continue;
                brokerId = Integer.toString(node.id());
                foundNode = true;
                break;
            }
            this.kafkaClient.poll(this.streamsConfig.getLong("poll.ms").longValue(), Time.SYSTEM.milliseconds());
        }
        if (brokerId == null) {
            throw new StreamsException("Could not find any available broker.");
        }
        return brokerId;
    }

    private String getControllerReadyBrokerId(MetadataResponse metadata) {
        return this.ensureOneNodeIsReady(Collections.singletonList(metadata.controller()));
    }

    private String getAnyReadyBrokerId() {
        Metadata metadata = new Metadata(this.streamsConfig.getLong("retry.backoff.ms").longValue(), this.streamsConfig.getLong("metadata.max.age.ms").longValue());
        List addresses = ClientUtils.parseAndValidateAddresses((List)this.streamsConfig.getList("bootstrap.servers"));
        metadata.update(Cluster.bootstrap((List)addresses), Time.SYSTEM.milliseconds());
        List nodes = metadata.fetch().nodes();
        return this.ensureOneNodeIsReady(nodes);
    }

    private ClientResponse sendRequest(ClientRequest clientRequest) {
        this.kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
        long responseTimeout = Time.SYSTEM.milliseconds() + (long)this.streamsConfig.getInt("request.timeout.ms").intValue();
        while (Time.SYSTEM.milliseconds() < responseTimeout) {
            List responseList = this.kafkaClient.poll(this.streamsConfig.getLong("poll.ms").longValue(), Time.SYSTEM.milliseconds());
            if (responseList.isEmpty()) continue;
            if (responseList.size() > 1) {
                throw new StreamsException("Sent one request but received multiple or no responses.");
            }
            ClientResponse response = (ClientResponse)responseList.get(0);
            if (response.requestHeader().correlationId() == clientRequest.correlationId()) {
                return response;
            }
            throw new StreamsException("Inconsistent response received from the broker " + clientRequest.destination() + ", expected correlation id " + clientRequest.correlationId() + ", but received " + response.requestHeader().correlationId());
        }
        throw new StreamsException("Failed to get response from broker within timeout");
    }

    public MetadataResponse fetchMetadata() {
        ClientRequest clientRequest = this.kafkaClient.newClientRequest(this.getAnyReadyBrokerId(), (AbstractRequest.Builder)new MetadataRequest.Builder(null), Time.SYSTEM.milliseconds(), true);
        ClientResponse clientResponse = this.sendRequest(clientRequest);
        if (!clientResponse.hasResponse()) {
            throw new StreamsException("Empty response for client request.");
        }
        if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
            throw new StreamsException("Inconsistent response type for internal topic metadata request. Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
        }
        MetadataResponse metadataResponse = (MetadataResponse)clientResponse.responseBody();
        return metadataResponse;
    }

    public void checkBrokerCompatibility() throws StreamsException {
        ClientRequest clientRequest = this.kafkaClient.newClientRequest(this.getAnyReadyBrokerId(), (AbstractRequest.Builder)new ApiVersionsRequest.Builder(), Time.SYSTEM.milliseconds(), true);
        ClientResponse clientResponse = this.sendRequest(clientRequest);
        if (!clientResponse.hasResponse()) {
            throw new StreamsException("Empty response for client request.");
        }
        if (!(clientResponse.responseBody() instanceof ApiVersionsResponse)) {
            throw new StreamsException("Inconsistent response type for API versions request. Expected ApiVersionsResponse but received " + clientResponse.responseBody().getClass().getName());
        }
        ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse)clientResponse.responseBody();
        if (apiVersionsResponse.apiVersion(ApiKeys.CREATE_TOPICS.id) == null) {
            throw new StreamsException("Kafka Streams requires broker version 0.10.1.x or higher.");
        }
    }

    public static class Config
    extends AbstractConfig {
        public static Config fromStreamsConfig(StreamsConfig streamsConfig) {
            return new Config(streamsConfig.originals());
        }

        public Config(Map<?, ?> originals) {
            super(CONFIG, originals, false);
        }
    }
}

