/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.common;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.NetworkClientProvider;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataClient {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataClient.class);
    private static final LogContext LOG_CONTEXT = new LogContext();
    private static final int DEFAULT_MAX_IN_FLIGHT_REQUEST = 1;
    private final AtomicInteger _metadataGeneration = new AtomicInteger(0);
    private final Metadata _metadata;
    private final NetworkClient _networkClient;
    private final Time _time;
    private final long _metadataTTL;
    private final long _refreshMetadataTimeout;

    public MetadataClient(KafkaCruiseControlConfig config, Metadata metadata, long metadataTTL, Time time) {
        this._metadata = metadata;
        this._refreshMetadataTimeout = config.getLong("metadata.max.age.ms");
        this._time = time;
        List addresses = ClientUtils.parseAndValidateAddresses((List)config.getList("bootstrap.servers"), (ClientDnsLookup)ClientDnsLookup.DEFAULT);
        Cluster bootstrapCluster = Cluster.bootstrap((List)addresses);
        MetadataResponse metadataResponse = KafkaCruiseControlUtils.prepareMetadataResponse(bootstrapCluster.nodes(), bootstrapCluster.clusterResource().clusterId(), -1, Collections.emptyList());
        this._metadata.update(-1, metadataResponse, false, time.milliseconds());
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((AbstractConfig)config, (Time)time, (LogContext)LOG_CONTEXT);
        NetworkClientProvider provider = config.getConfiguredInstance("network.client.provider.class", NetworkClientProvider.class);
        this._networkClient = provider.createNetworkClient(config.getLong("connections.max.idle.ms"), new Metrics(), time, "load-monitor", channelBuilder, this._metadata, config.getString("client.id"), 1, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), config.getInt("request.timeout.ms"), true, new ApiVersions());
        this._metadataTTL = metadataTTL;
        this.doRefreshMetadata(this._refreshMetadataTimeout);
    }

    public synchronized ClusterAndGeneration refreshMetadata() {
        return this.refreshMetadata(this._refreshMetadataTimeout);
    }

    private void doRefreshMetadata(long timeout) {
        long start;
        boolean isMetadataUpdated;
        int updateVersion = this._metadata.requestUpdate();
        Cluster beforeUpdate = this._metadata.fetch();
        boolean bl = isMetadataUpdated = this._metadata.updateVersion() > updateVersion;
        for (long remaining = timeout; !isMetadataUpdated && remaining > 0L; remaining -= this._time.milliseconds() - start) {
            this._metadata.requestUpdate();
            start = this._time.milliseconds();
            this._networkClient.poll(remaining, start);
            isMetadataUpdated = this._metadata.updateVersion() > updateVersion;
        }
        if (isMetadataUpdated) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Updated metadata {}", (Object)this._metadata.fetch());
            }
            if (MonitorUtils.metadataChanged(beforeUpdate, this._metadata.fetch())) {
                this._metadataGeneration.incrementAndGet();
            }
        } else {
            LOG.warn("Failed to update metadata in {}ms. Using old metadata with version {} and last successful update {}.", new Object[]{timeout, this._metadata.updateVersion(), this._metadata.lastSuccessfulUpdate()});
        }
    }

    public synchronized ClusterAndGeneration refreshMetadata(long timeout) {
        if (this._time.milliseconds() >= this._metadata.lastSuccessfulUpdate() + this._metadataTTL) {
            this.doRefreshMetadata(timeout);
        }
        return new ClusterAndGeneration(this._metadata.fetch(), this._metadataGeneration.get());
    }

    public synchronized void close() {
        this._networkClient.close();
    }

    public Metadata metadata() {
        return this._metadata;
    }

    public ClusterAndGeneration clusterAndGeneration() {
        return new ClusterAndGeneration(this.cluster(), this._metadataGeneration.get());
    }

    public Cluster cluster() {
        return this._metadata.fetch();
    }

    public static class ClusterAndGeneration {
        private final Cluster _cluster;
        private final int _generation;

        public ClusterAndGeneration(Cluster cluster, int generation) {
            this._cluster = cluster;
            this._generation = generation;
        }

        public Cluster cluster() {
            return this._cluster;
        }

        public int generation() {
            return this._generation;
        }
    }
}

