package com.linkedin.kafka.cruisecontrol.common;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.MonitorConfig;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/common/MetadataClient.class */
public class MetadataClient {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataClient.class);
    private static final LogContext LOG_CONTEXT = new LogContext();
    private static final int DEFAULT_MAX_IN_FLIGHT_REQUEST = 1;
    private final AtomicInteger _metadataGeneration = new AtomicInteger(0);
    private final Metadata _metadata;
    private final NetworkClient _networkClient;
    private final Time _time;
    private final long _metadataTTL;
    private final long _refreshMetadataTimeout;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/common/MetadataClient$ClusterAndGeneration.class */
    public static class ClusterAndGeneration {
        private final Cluster _cluster;
        private final int _generation;

        public ClusterAndGeneration(Cluster cluster, int i) {
            this._cluster = cluster;
            this._generation = i;
        }

        public Cluster cluster() {
            return this._cluster;
        }

        public int generation() {
            return this._generation;
        }
    }

    public MetadataClient(KafkaCruiseControlConfig kafkaCruiseControlConfig, Metadata metadata, long j, Time time) {
        this._metadata = metadata;
        this._refreshMetadataTimeout = kafkaCruiseControlConfig.getLong(MonitorConfig.METADATA_MAX_AGE_CONFIG).longValue();
        this._time = time;
        Cluster bootstrap = Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(kafkaCruiseControlConfig.getList(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG), ClientDnsLookup.DEFAULT));
        this._metadata.update(-1, KafkaCruiseControlUtils.prepareMetadataResponse(bootstrap.nodes(), bootstrap.clusterResource().clusterId(), -1, Collections.emptyList()), false, time.milliseconds());
        this._networkClient = ((NetworkClientProvider) kafkaCruiseControlConfig.getConfiguredInstance(MonitorConfig.NETWORK_CLIENT_PROVIDER_CLASS_CONFIG, NetworkClientProvider.class)).createNetworkClient(kafkaCruiseControlConfig.getLong(MonitorConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG).longValue(), new Metrics(), time, "load-monitor", ClientUtils.createChannelBuilder(kafkaCruiseControlConfig, time, LOG_CONTEXT), this._metadata, kafkaCruiseControlConfig.getString(MonitorConfig.CLIENT_ID_CONFIG), 1, kafkaCruiseControlConfig.getLong(MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG).longValue(), kafkaCruiseControlConfig.getLong(MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG).longValue(), kafkaCruiseControlConfig.getInt(MonitorConfig.SEND_BUFFER_CONFIG).intValue(), kafkaCruiseControlConfig.getInt(MonitorConfig.RECEIVE_BUFFER_CONFIG).intValue(), kafkaCruiseControlConfig.getInt(MonitorConfig.REQUEST_TIMEOUT_MS_CONFIG).intValue(), true, new ApiVersions());
        this._metadataTTL = j;
        doRefreshMetadata(this._refreshMetadataTimeout);
    }

    public synchronized ClusterAndGeneration refreshMetadata() {
        return refreshMetadata(this._refreshMetadataTimeout);
    }

    private void doRefreshMetadata(long j) {
        boolean z;
        int requestUpdate = this._metadata.requestUpdate();
        long j2 = j;
        Cluster fetch = this._metadata.fetch();
        boolean z2 = this._metadata.updateVersion() > requestUpdate;
        while (true) {
            z = z2;
            if (z || j2 <= 0) {
                break;
            }
            this._metadata.requestUpdate();
            long milliseconds = this._time.milliseconds();
            this._networkClient.poll(j2, milliseconds);
            j2 -= this._time.milliseconds() - milliseconds;
            z2 = this._metadata.updateVersion() > requestUpdate;
        }
        if (!z) {
            LOG.warn("Failed to update metadata in {}ms. Using old metadata with version {} and last successful update {}.", new Object[]{Long.valueOf(j), Integer.valueOf(this._metadata.updateVersion()), Long.valueOf(this._metadata.lastSuccessfulUpdate())});
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Updated metadata {}", this._metadata.fetch());
        }
        if (MonitorUtils.metadataChanged(fetch, this._metadata.fetch())) {
            this._metadataGeneration.incrementAndGet();
        }
    }

    public synchronized ClusterAndGeneration refreshMetadata(long j) {
        if (this._time.milliseconds() >= this._metadata.lastSuccessfulUpdate() + this._metadataTTL) {
            doRefreshMetadata(j);
        }
        return new ClusterAndGeneration(this._metadata.fetch(), this._metadataGeneration.get());
    }

    public synchronized void close() {
        this._networkClient.close();
    }

    public Metadata metadata() {
        return this._metadata;
    }

    public ClusterAndGeneration clusterAndGeneration() {
        return new ClusterAndGeneration(cluster(), this._metadataGeneration.get());
    }

    public Cluster cluster() {
        return this._metadata.fetch();
    }
}
