/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClusterConnectionStates;
import org.apache.kafka.clients.DefaultHostResolver;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.InFlightRequests;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CorrelationIdMismatchException;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class NetworkClient
implements KafkaClient {
    private final Logger log;
    private final Selectable selector;
    private final MetadataUpdater metadataUpdater;
    private final Random randOffset;
    private final ClusterConnectionStates connectionStates;
    private final InFlightRequests inFlightRequests;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private int correlation;
    private final int defaultRequestTimeoutMs;
    private final long reconnectBackoffMs;
    private final long rebootstrapTriggerMs;
    private final MetadataRecoveryStrategy metadataRecoveryStrategy;
    private final Time time;
    private final boolean discoverBrokerVersions;
    private final ApiVersions apiVersions;
    private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<String, ApiVersionsRequest.Builder>();
    private final List<ClientResponse> abortedSends = new LinkedList<ClientResponse>();
    private final Sensor throttleTimeSensor;
    private final AtomicReference<State> state;
    private final TelemetrySender telemetrySender;

    public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext, MetadataRecoveryStrategy metadataRecoveryStrategy) {
        this(selector, metadata, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs, connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, time, discoverBrokerVersions, apiVersions, logContext, Long.MAX_VALUE, metadataRecoveryStrategy);
    }

    public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext, long rebootstrapTriggerMs, MetadataRecoveryStrategy metadataRecoveryStrategy) {
        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs, connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, time, discoverBrokerVersions, apiVersions, null, logContext, new DefaultHostResolver(), null, rebootstrapTriggerMs, metadataRecoveryStrategy);
    }

    public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, Sensor throttleTimeSensor, LogContext logContext, MetadataRecoveryStrategy metadataRecoveryStrategy) {
        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs, connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, time, discoverBrokerVersions, apiVersions, throttleTimeSensor, logContext, new DefaultHostResolver(), null, Long.MAX_VALUE, metadataRecoveryStrategy);
    }

    public NetworkClient(Selectable selector, MetadataUpdater metadataUpdater, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext, MetadataRecoveryStrategy metadataRecoveryStrategy) {
        this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs, connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, time, discoverBrokerVersions, apiVersions, null, logContext, new DefaultHostResolver(), null, Long.MAX_VALUE, metadataRecoveryStrategy);
    }

    public NetworkClient(MetadataUpdater metadataUpdater, Metadata metadata, Selectable selector, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, Sensor throttleTimeSensor, LogContext logContext, HostResolver hostResolver, ClientTelemetrySender clientTelemetrySender, long rebootstrapTriggerMs, MetadataRecoveryStrategy metadataRecoveryStrategy) {
        if (metadataUpdater == null) {
            if (metadata == null) {
                throw new IllegalArgumentException("`metadata` must not be null");
            }
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        } else {
            this.metadataUpdater = metadataUpdater;
        }
        this.selector = selector;
        this.clientId = clientId;
        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext, hostResolver);
        this.socketSendBuffer = socketSendBuffer;
        this.socketReceiveBuffer = socketReceiveBuffer;
        this.correlation = 0;
        this.randOffset = new Random();
        this.defaultRequestTimeoutMs = defaultRequestTimeoutMs;
        this.reconnectBackoffMs = reconnectBackoffMs;
        this.time = time;
        this.discoverBrokerVersions = discoverBrokerVersions;
        this.apiVersions = apiVersions;
        this.throttleTimeSensor = throttleTimeSensor;
        this.log = logContext.logger(NetworkClient.class);
        this.state = new AtomicReference<State>(State.ACTIVE);
        this.telemetrySender = clientTelemetrySender != null ? new TelemetrySender(clientTelemetrySender) : null;
        this.rebootstrapTriggerMs = rebootstrapTriggerMs;
        this.metadataRecoveryStrategy = metadataRecoveryStrategy;
    }

    @Override
    public boolean ready(Node node, long now) {
        if (node.isEmpty()) {
            throw new IllegalArgumentException("Cannot connect to empty node " + String.valueOf(node));
        }
        if (this.isReady(node, now)) {
            return true;
        }
        if (this.connectionStates.canConnect(node.idString(), now)) {
            this.initiateConnect(node, now);
        }
        return false;
    }

    boolean canConnect(Node node, long now) {
        return this.connectionStates.canConnect(node.idString(), now);
    }

    @Override
    public void disconnect(String nodeId) {
        if (this.connectionStates.isDisconnected(nodeId)) {
            this.log.debug("Client requested disconnect from node {}, which is already disconnected", (Object)nodeId);
            return;
        }
        this.log.info("Client requested disconnect from node {}", (Object)nodeId);
        this.selector.close(nodeId);
        long now = this.time.milliseconds();
        this.cancelInFlightRequests(nodeId, now, this.abortedSends, false);
        this.connectionStates.disconnected(nodeId, now);
    }

    private void cancelInFlightRequests(String nodeId, long now, Collection<ClientResponse> responses, boolean timedOut) {
        Iterable<InFlightRequest> inFlightRequests = this.inFlightRequests.clearAll(nodeId);
        for (InFlightRequest request : inFlightRequests) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Cancelled in-flight {} request with correlation id {} due to node {} being disconnected (elapsed time since creation: {}ms, elapsed time since send: {}ms, throttle time: {}ms, request timeout: {}ms): {}", new Object[]{request.header.apiKey(), request.header.correlationId(), nodeId, request.timeElapsedSinceCreateMs(now), request.timeElapsedSinceSendMs(now), request.throttleTimeMs(), request.requestTimeoutMs, request.request});
            } else {
                this.log.info("Cancelled in-flight {} request with correlation id {} due to node {} being disconnected (elapsed time since creation: {}ms, elapsed time since send: {}ms, throttle time: {}ms, request timeout: {}ms)", new Object[]{request.header.apiKey(), request.header.correlationId(), nodeId, request.timeElapsedSinceCreateMs(now), request.timeElapsedSinceSendMs(now), request.throttleTimeMs(), request.requestTimeoutMs});
            }
            if (!request.isInternalRequest) {
                if (responses == null) continue;
                ClientResponse clientResponse = timedOut ? request.timedOut(now) : request.disconnected(now);
                responses.add(clientResponse);
                continue;
            }
            if (request.header.apiKey() == ApiKeys.METADATA) {
                this.metadataUpdater.handleFailedRequest(now, Optional.empty());
                continue;
            }
            if (!this.isTelemetryApi(request.header.apiKey()) || this.telemetrySender == null) continue;
            this.telemetrySender.handleFailedRequest(request.header.apiKey(), null);
        }
    }

    @Override
    public void close(String nodeId) {
        this.log.info("Client requested connection close from node {}", (Object)nodeId);
        this.selector.close(nodeId);
        long now = this.time.milliseconds();
        this.cancelInFlightRequests(nodeId, now, null, false);
        this.connectionStates.remove(nodeId);
        this.apiVersions.remove(nodeId);
        this.nodesNeedingApiVersionsFetch.remove(nodeId);
    }

    @Override
    public long connectionDelay(Node node, long now) {
        return this.connectionStates.connectionDelay(node.idString(), now);
    }

    public long throttleDelayMs(Node node, long now) {
        return this.connectionStates.throttleDelayMs(node.idString(), now);
    }

    @Override
    public long pollDelayMs(Node node, long now) {
        return this.connectionStates.pollDelayMs(node.idString(), now);
    }

    @Override
    public boolean connectionFailed(Node node) {
        return this.connectionStates.isDisconnected(node.idString());
    }

    @Override
    public AuthenticationException authenticationException(Node node) {
        return this.connectionStates.authenticationException(node.idString());
    }

    @Override
    public boolean isReady(Node node, long now) {
        return !this.metadataUpdater.isUpdateDue(now) && this.canSendRequest(node.idString(), now);
    }

    private boolean canSendRequest(String node, long now) {
        return this.connectionStates.isReady(node, now) && this.selector.isChannelReady(node) && this.inFlightRequests.canSendMore(node);
    }

    @Override
    public void send(ClientRequest request, long now) {
        this.doSend(request, false, now);
    }

    void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
        ClientRequest clientRequest = this.newClientRequest(nodeConnectionId, builder, now, true);
        this.doSend(clientRequest, true, now);
    }

    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
        block8: {
            this.ensureActive();
            String nodeId = clientRequest.destination();
            if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {
                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
            }
            AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
            try {
                short version;
                NodeApiVersions versionInfo = this.apiVersions.get(nodeId);
                if (versionInfo == null) {
                    version = builder.latestAllowedVersion();
                    if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
                        this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});
                    }
                } else {
                    version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());
                }
                this.doSend(clientRequest, isInternalRequest, now, (AbstractRequest)builder.build(version));
            }
            catch (UnsupportedVersionException unsupportedVersionException) {
                this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder, clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
                ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, unsupportedVersionException, null, null);
                if (!isInternalRequest) {
                    this.abortedSends.add(clientResponse);
                }
                if (clientRequest.apiKey() == ApiKeys.METADATA) {
                    this.metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
                }
                if (!this.isTelemetryApi(clientRequest.apiKey()) || this.telemetrySender == null) break block8;
                this.telemetrySender.handleFailedRequest(clientRequest.apiKey(), unsupportedVersionException);
            }
        }
    }

    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});
        }
        Send send = request.toSend(header);
        InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
        this.inFlightRequests.add(inFlightRequest);
        this.selector.send(new NetworkSend(clientRequest.destination(), send));
    }

    @Override
    public List<ClientResponse> poll(long timeout, long now) {
        this.ensureActive();
        if (!this.abortedSends.isEmpty()) {
            ArrayList<ClientResponse> responses = new ArrayList<ClientResponse>();
            this.handleAbortedSends(responses);
            this.completeResponses(responses);
            return responses;
        }
        long metadataTimeout = this.metadataUpdater.maybeUpdate(now);
        long telemetryTimeout = this.telemetrySender != null ? this.telemetrySender.maybeUpdate(now) : Integer.MAX_VALUE;
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, telemetryTimeout, this.defaultRequestTimeoutMs));
        }
        catch (IOException e) {
            this.log.error("Unexpected error during I/O", e);
        }
        long updatedNow = this.time.milliseconds();
        ArrayList<ClientResponse> responses = new ArrayList<ClientResponse>();
        this.handleCompletedSends(responses, updatedNow);
        this.handleCompletedReceives(responses, updatedNow);
        this.handleDisconnections(responses, updatedNow);
        this.handleConnections();
        this.handleInitiateApiVersionRequests(updatedNow);
        this.handleTimedOutConnections(responses, updatedNow);
        this.handleTimedOutRequests(responses, updatedNow);
        this.handleRebootstrap(responses, updatedNow);
        this.completeResponses(responses);
        return responses;
    }

    private void completeResponses(List<ClientResponse> responses) {
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            }
            catch (Exception e) {
                this.log.error("Uncaught error in request completion:", e);
            }
        }
    }

    @Override
    public int inFlightRequestCount() {
        return this.inFlightRequests.count();
    }

    @Override
    public boolean hasInFlightRequests() {
        return !this.inFlightRequests.isEmpty();
    }

    @Override
    public int inFlightRequestCount(String node) {
        return this.inFlightRequests.count(node);
    }

    @Override
    public boolean hasInFlightRequests(String node) {
        return !this.inFlightRequests.isEmpty(node);
    }

    @Override
    public boolean hasReadyNodes(long now) {
        return this.connectionStates.hasReadyNodes(now);
    }

    @Override
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override
    public void initiateClose() {
        if (this.state.compareAndSet(State.ACTIVE, State.CLOSING)) {
            this.wakeup();
        }
    }

    @Override
    public boolean active() {
        return this.state.get() == State.ACTIVE;
    }

    private void ensureActive() {
        if (!this.active()) {
            throw new DisconnectException("NetworkClient is no longer active, state is " + String.valueOf(this.state));
        }
    }

    @Override
    public void close() {
        this.state.compareAndSet(State.ACTIVE, State.CLOSING);
        if (this.state.compareAndSet(State.CLOSING, State.CLOSED)) {
            this.selector.close();
            this.metadataUpdater.close();
            if (this.telemetrySender != null) {
                this.telemetrySender.close();
            }
        } else {
            this.log.warn("Attempting to close NetworkClient that has already been closed.");
        }
    }

    @Override
    public LeastLoadedNode leastLoadedNode(long now) {
        List<Node> nodes = this.metadataUpdater.fetchNodes();
        if (nodes.isEmpty()) {
            throw new IllegalStateException("There are no nodes in the Kafka cluster");
        }
        int inflight = Integer.MAX_VALUE;
        Node foundConnecting = null;
        Node foundCanConnect = null;
        Node foundReady = null;
        boolean atLeastOneConnectionReady = false;
        int offset = this.randOffset.nextInt(nodes.size());
        for (int i = 0; i < nodes.size(); ++i) {
            int idx = (offset + i) % nodes.size();
            Node node = nodes.get(idx);
            if (!atLeastOneConnectionReady && this.connectionStates.isReady(node.idString(), now) && this.selector.isChannelReady(node.idString())) {
                atLeastOneConnectionReady = true;
            }
            if (this.canSendRequest(node.idString(), now)) {
                int currInflight = this.inFlightRequests.count(node.idString());
                if (currInflight == 0) {
                    this.log.trace("Found least loaded node {} connected with no in-flight requests", (Object)node);
                    return new LeastLoadedNode(node, true);
                }
                if (currInflight >= inflight) continue;
                inflight = currInflight;
                foundReady = node;
                continue;
            }
            if (this.connectionStates.isPreparingConnection(node.idString())) {
                foundConnecting = node;
                continue;
            }
            if (this.canConnect(node, now)) {
                if (foundCanConnect != null && this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) <= this.connectionStates.lastConnectAttemptMs(node.idString())) continue;
                foundCanConnect = node;
                continue;
            }
            this.log.trace("Removing node {} from least loaded node selection since it is neither ready for sending or connecting", (Object)node);
        }
        if (foundReady != null) {
            this.log.trace("Found least loaded node {} with {} inflight requests", (Object)foundReady, (Object)inflight);
            return new LeastLoadedNode(foundReady, atLeastOneConnectionReady);
        }
        if (foundConnecting != null) {
            this.log.trace("Found least loaded connecting node {}", (Object)foundConnecting);
            return new LeastLoadedNode(foundConnecting, atLeastOneConnectionReady);
        }
        if (foundCanConnect != null) {
            this.log.trace("Found least loaded node {} with no active connection", (Object)foundCanConnect);
            return new LeastLoadedNode(foundCanConnect, atLeastOneConnectionReady);
        }
        this.log.trace("Least loaded node selection failed to find an available node");
        return new LeastLoadedNode(null, atLeastOneConnectionReady);
    }

    public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
        try {
            return AbstractResponse.parseResponse(responseBuffer, requestHeader);
        }
        catch (BufferUnderflowException e) {
            throw new SchemaException("Buffer underflow while parsing response for request with header " + String.valueOf(requestHeader), e);
        }
        catch (CorrelationIdMismatchException e) {
            if (SaslClientAuthenticator.isReserved(requestHeader.correlationId()) && !SaslClientAuthenticator.isReserved(e.responseCorrelationId())) {
                throw new SchemaException("The response is unrelated to Sasl request since its correlation id is " + e.responseCorrelationId() + " and the reserved range for Sasl request is [ 2147483640,2147483647]");
            }
            throw e;
        }
    }

    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) {
        this.processDisconnection(responses, nodeId, now, disconnectState, false);
    }

    private void processTimeoutDisconnection(List<ClientResponse> responses, String nodeId, long now) {
        this.processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE, true);
    }

    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState, boolean timedOut) {
        this.connectionStates.disconnected(nodeId, now);
        this.apiVersions.remove(nodeId);
        this.nodesNeedingApiVersionsFetch.remove(nodeId);
        switch (disconnectState.state()) {
            case AUTHENTICATION_FAILED: {
                AuthenticationException exception = disconnectState.exception();
                this.connectionStates.authenticationFailed(nodeId, now, exception);
                this.log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId, disconnectState.remoteAddress(), exception.getMessage());
                break;
            }
            case AUTHENTICATE: {
                this.log.warn("Connection to node {} ({}) terminated during authentication. This may happen due to any of the following reasons: (1) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (2) Transient network issue.", (Object)nodeId, (Object)disconnectState.remoteAddress());
                break;
            }
            case NOT_CONNECTED: {
                this.log.warn("Connection to node {} ({}) could not be established. Node may not be available.", (Object)nodeId, (Object)disconnectState.remoteAddress());
                break;
            }
        }
        this.cancelInFlightRequests(nodeId, now, responses, timedOut);
        this.metadataUpdater.handleServerDisconnect(now, nodeId, Optional.ofNullable(disconnectState.exception()));
    }

    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
        List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
        for (String nodeId : nodeIds) {
            this.selector.close(nodeId);
            this.log.info("Disconnecting from node {} due to request timeout.", (Object)nodeId);
            this.processTimeoutDisconnection(responses, nodeId, now);
        }
    }

    private void handleAbortedSends(List<ClientResponse> responses) {
        responses.addAll(this.abortedSends);
        this.abortedSends.clear();
    }

    private void handleTimedOutConnections(List<ClientResponse> responses, long now) {
        List<String> nodes = this.connectionStates.nodesWithConnectionSetupTimeout(now);
        for (String nodeId : nodes) {
            this.selector.close(nodeId);
            this.log.info("Disconnecting from node {} due to socket connection setup timeout. The timeout value is {} ms.", (Object)nodeId, (Object)this.connectionStates.connectionSetupTimeoutMs(nodeId));
            this.processTimeoutDisconnection(responses, nodeId, now);
        }
    }

    private void handleCompletedSends(List<ClientResponse> responses, long now) {
        for (NetworkSend send : this.selector.completedSends()) {
            InFlightRequest request = this.inFlightRequests.lastSent(send.destinationId());
            if (request.expectResponse) continue;
            this.inFlightRequests.completeLastSent(send.destinationId());
            responses.add(request.completed(null, now));
        }
    }

    private void maybeThrottle(AbstractResponse response, short apiVersion, String nodeId, long now) {
        int throttleTimeMs = response.throttleTimeMs();
        if (throttleTimeMs > 0 && response.shouldClientThrottle(apiVersion)) {
            this.inFlightRequests.incrementThrottleTime(nodeId, throttleTimeMs);
            this.connectionStates.throttle(nodeId, now + (long)throttleTimeMs);
            this.log.trace("Connection to node {} is throttled for {} ms until timestamp {}", nodeId, throttleTimeMs, now + (long)throttleTimeMs);
        }
    }

    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            InFlightRequest req = this.inFlightRequests.completeNext(source);
            AbstractResponse response = NetworkClient.parseResponse(receive.payload(), req.header);
            if (this.throttleTimeSensor != null) {
                this.throttleTimeSensor.record(response.throttleTimeMs(), now);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received {} response from node {} for request with header {}: {}", new Object[]{req.header.apiKey(), req.destination, req.header, response});
            }
            this.maybeThrottle(response, req.header.apiVersion(), req.destination, now);
            if (req.isInternalRequest && response instanceof MetadataResponse) {
                this.metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse)response);
                continue;
            }
            if (req.isInternalRequest && response instanceof ApiVersionsResponse) {
                this.handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse)response);
                continue;
            }
            if (req.isInternalRequest && response instanceof GetTelemetrySubscriptionsResponse) {
                this.telemetrySender.handleResponse((GetTelemetrySubscriptionsResponse)response);
                continue;
            }
            if (req.isInternalRequest && response instanceof PushTelemetryResponse) {
                this.telemetrySender.handleResponse((PushTelemetryResponse)response);
                continue;
            }
            responses.add(req.completed(response, now));
        }
    }

    private void handleApiVersionsResponse(List<ClientResponse> responses, InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
        String node = req.destination;
        if (apiVersionsResponse.data().errorCode() != Errors.NONE.code()) {
            if (req.request.version() == 0 || apiVersionsResponse.data().errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
                this.log.warn("Received error {} from node {} when making an ApiVersionsRequest with correlation id {}. Disconnecting.", new Object[]{Errors.forCode(apiVersionsResponse.data().errorCode()), node, req.header.correlationId()});
                this.selector.close(node);
                this.processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
            } else {
                ApiVersionsResponseData.ApiVersion apiVersion;
                short maxApiVersion = 0;
                if (apiVersionsResponse.data().apiKeys().size() > 0 && (apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id)) != null) {
                    maxApiVersion = apiVersion.maxVersion();
                }
                this.nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder(maxApiVersion));
            }
            return;
        }
        NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().finalizedFeatures(), apiVersionsResponse.data().finalizedFeaturesEpoch());
        this.apiVersions.update(node, nodeVersionInfo);
        this.connectionStates.ready(node);
        this.log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.", node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(), apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady(), nodeVersionInfo);
    }

    private void handleDisconnections(List<ClientResponse> responses, long now) {
        for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
            String node = entry.getKey();
            ChannelState channelState = entry.getValue();
            if (channelState == ChannelState.EXPIRED) {
                this.log.debug("Idle connection to node {} disconnected.", (Object)node);
            } else {
                this.log.info("Node {} disconnected.", (Object)node);
            }
            this.processDisconnection(responses, node, now, channelState);
        }
    }

    private void handleConnections() {
        for (String node : this.selector.connected()) {
            if (this.discoverBrokerVersions) {
                this.nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
                this.log.debug("Completed connection to node {}. Fetching API versions.", (Object)node);
                continue;
            }
            this.connectionStates.ready(node);
            this.log.debug("Completed connection to node {}. Ready.", (Object)node);
        }
    }

    private void handleInitiateApiVersionRequests(long now) {
        Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = this.nodesNeedingApiVersionsFetch.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
            String node = entry.getKey();
            if (!this.selector.isChannelReady(node) || !this.inFlightRequests.canSendMore(node)) continue;
            this.log.debug("Initiating API versions fetch from node {}.", (Object)node);
            this.connectionStates.checkingApiVersions(node);
            ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
            ClientRequest clientRequest = this.newClientRequest(node, apiVersionRequestBuilder, now, true);
            this.doSend(clientRequest, true, now);
            iter.remove();
        }
    }

    private void handleRebootstrap(List<ClientResponse> responses, long now) {
        if (this.metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && this.metadataUpdater.needsRebootstrap(now, this.rebootstrapTriggerMs)) {
            this.metadataUpdater.fetchNodes().forEach(node -> {
                String nodeId = node.idString();
                this.selector.close(nodeId);
                if (this.connectionStates.isConnecting(nodeId) || this.connectionStates.isConnected(nodeId)) {
                    this.log.info("Disconnecting from node {} due to client rebootstrap.", (Object)nodeId);
                    this.processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
                }
            });
            this.metadataUpdater.rebootstrap(now);
        }
    }

    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            this.connectionStates.connecting(nodeConnectionId, now, node.host());
            InetAddress address = this.connectionStates.currentAddress(nodeConnectionId);
            this.log.debug("Initiating connection to node {} using address {}", (Object)node, (Object)address);
            this.selector.connect(nodeConnectionId, new InetSocketAddress(address, node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
        }
        catch (IOException e) {
            this.log.warn("Error connecting to node {}", (Object)node, (Object)e);
            this.connectionStates.disconnected(nodeConnectionId, now);
            this.metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
        }
    }

    private boolean isAnyNodeConnecting() {
        for (Node node : this.metadataUpdater.fetchNodes()) {
            if (!this.connectionStates.isConnecting(node.idString())) continue;
            return true;
        }
        return false;
    }

    private boolean isTelemetryApi(ApiKeys apiKey) {
        return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY;
    }

    @Override
    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, boolean expectResponse) {
        return this.newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, this.defaultRequestTimeoutMs, null);
    }

    int nextCorrelationId() {
        if (SaslClientAuthenticator.isReserved(this.correlation)) {
            this.correlation = Integer.MIN_VALUE;
        }
        return this.correlation++;
    }

    Node telemetryConnectedNode() {
        return this.telemetrySender.stickyNode;
    }

    @Override
    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, boolean expectResponse, int requestTimeoutMs, RequestCompletionHandler callback) {
        return new ClientRequest(nodeId, requestBuilder, this.nextCorrelationId(), this.clientId, createdTimeMs, expectResponse, requestTimeoutMs, callback);
    }

    public boolean discoverBrokerVersions() {
        return this.discoverBrokerVersions;
    }

    class DefaultMetadataUpdater
    implements MetadataUpdater {
        private final Metadata metadata;
        private InProgressData inProgress;
        private Optional<Long> metadataAttemptStartMs = Optional.empty();

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
            this.inProgress = null;
        }

        @Override
        public List<Node> fetchNodes() {
            return this.metadata.fetch().nodes();
        }

        @Override
        public boolean isUpdateDue(long now) {
            return !this.hasFetchInProgress() && this.metadata.timeToNextUpdate(now) == 0L;
        }

        private boolean hasFetchInProgress() {
            return this.inProgress != null;
        }

        @Override
        public long maybeUpdate(long now) {
            long waitForMetadataFetch;
            long timeToNextMetadataUpdate = this.metadata.timeToNextUpdate(now);
            long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch = this.hasFetchInProgress() ? (long)NetworkClient.this.defaultRequestTimeoutMs : 0L);
            if (metadataTimeout > 0L) {
                return metadataTimeout;
            }
            if (this.metadataAttemptStartMs.isEmpty()) {
                this.metadataAttemptStartMs = Optional.of(now);
            }
            LeastLoadedNode leastLoadedNode = NetworkClient.this.leastLoadedNode(now);
            if (NetworkClient.this.metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
                this.rebootstrap(now);
                leastLoadedNode = NetworkClient.this.leastLoadedNode(now);
            }
            if (leastLoadedNode.node() == null) {
                NetworkClient.this.log.debug("Give up sending metadata request since no node is available");
                return NetworkClient.this.reconnectBackoffMs;
            }
            return this.maybeUpdate(now, leastLoadedNode.node());
        }

        @Override
        public void handleServerDisconnect(long now, String destinationId, Optional<AuthenticationException> maybeFatalException) {
            int nodeId;
            Node node;
            Cluster cluster = this.metadata.fetch();
            if (cluster.isBootstrapConfigured() && (node = cluster.nodeById(nodeId = Integer.parseInt(destinationId))) != null) {
                NetworkClient.this.log.warn("Bootstrap broker {} disconnected", (Object)node);
            }
            if (this.isUpdateDue(now)) {
                this.handleFailedRequest(now, Optional.empty());
            }
            maybeFatalException.ifPresent(this.metadata::fatalError);
            this.metadata.requestUpdate(false);
        }

        @Override
        public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
            maybeFatalException.ifPresent(this.metadata::fatalError);
            this.metadata.failedUpdate(now);
            this.inProgress = null;
        }

        @Override
        public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
            Map<String, Errors> errors;
            List missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata -> topicMetadata.partitionMetadata().stream().filter(partitionMetadata -> partitionMetadata.error == Errors.LISTENER_NOT_FOUND).map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition()))).collect(Collectors.toList());
            if (!missingListenerPartitions.isEmpty()) {
                int count = missingListenerPartitions.size();
                NetworkClient.this.log.warn("{} partitions have leader brokers without a matching listener, including {}", (Object)count, (Object)missingListenerPartitions.subList(0, Math.min(10, count)));
            }
            if (!(errors = response.errors()).isEmpty()) {
                NetworkClient.this.log.warn("The metadata response from the cluster reported a recoverable issue with correlation id {} : {}", (Object)requestHeader.correlationId(), (Object)errors);
            }
            if (NetworkClient.this.metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && response.topLevelError() == Errors.REBOOTSTRAP_REQUIRED) {
                NetworkClient.this.log.info("Rebootstrap requested by server.");
                this.initiateRebootstrap();
            } else if (response.brokers().isEmpty()) {
                NetworkClient.this.log.trace("Ignoring empty metadata response with correlation id {}.", (Object)requestHeader.correlationId());
                this.metadata.failedUpdate(now);
            } else {
                this.metadata.update(this.inProgress.requestVersion, response, this.inProgress.isPartialUpdate, now);
                this.metadataAttemptStartMs = Optional.empty();
            }
            this.inProgress = null;
        }

        @Override
        public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
            return this.metadataAttemptStartMs.filter(startMs -> now - startMs > rebootstrapTriggerMs).isPresent();
        }

        @Override
        public void rebootstrap(long now) {
            this.metadata.rebootstrap();
            this.metadataAttemptStartMs = Optional.of(now);
        }

        @Override
        public void close() {
            this.metadata.close();
        }

        private void initiateRebootstrap() {
            this.metadataAttemptStartMs = Optional.of(0L);
        }

        private long maybeUpdate(long now, Node node) {
            String nodeConnectionId = node.idString();
            if (NetworkClient.this.canSendRequest(nodeConnectionId, now)) {
                Metadata.MetadataRequestAndVersion requestAndVersion = this.metadata.newMetadataRequestAndVersion(now);
                MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                NetworkClient.this.log.debug("Sending metadata request {} to node {}", (Object)metadataRequest, (Object)node);
                NetworkClient.this.sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                this.inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
                return NetworkClient.this.defaultRequestTimeoutMs;
            }
            if (NetworkClient.this.isAnyNodeConnecting()) {
                return NetworkClient.this.reconnectBackoffMs;
            }
            if (NetworkClient.this.connectionStates.canConnect(nodeConnectionId, now)) {
                NetworkClient.this.log.debug("Initialize connection to node {} for sending metadata request", (Object)node);
                NetworkClient.this.initiateConnect(node, now);
                return NetworkClient.this.reconnectBackoffMs;
            }
            return Long.MAX_VALUE;
        }

        public class InProgressData {
            public final int requestVersion;
            public final boolean isPartialUpdate;

            private InProgressData(int requestVersion, boolean isPartialUpdate) {
                this.requestVersion = requestVersion;
                this.isPartialUpdate = isPartialUpdate;
            }
        }
    }

    private static enum State {
        ACTIVE,
        CLOSING,
        CLOSED;

    }

    class TelemetrySender {
        private final ClientTelemetrySender clientTelemetrySender;
        private Node stickyNode;

        public TelemetrySender(ClientTelemetrySender clientTelemetrySender) {
            this.clientTelemetrySender = clientTelemetrySender;
        }

        public long maybeUpdate(long now) {
            long timeToNextUpdate = this.clientTelemetrySender.timeToNextUpdate(NetworkClient.this.defaultRequestTimeoutMs);
            if (timeToNextUpdate > 0L) {
                return timeToNextUpdate;
            }
            if (this.stickyNode == null) {
                this.stickyNode = NetworkClient.this.leastLoadedNode(now).node();
                if (this.stickyNode == null) {
                    NetworkClient.this.log.debug("Give up sending telemetry request since no node is available");
                    return NetworkClient.this.reconnectBackoffMs;
                }
            }
            return this.maybeUpdate(now, this.stickyNode);
        }

        private long maybeUpdate(long now, Node node) {
            String nodeConnectionId = node.idString();
            if (NetworkClient.this.canSendRequest(nodeConnectionId, now)) {
                Optional<AbstractRequest.Builder<?>> requestOpt = this.clientTelemetrySender.createRequest();
                if (requestOpt.isEmpty()) {
                    return Long.MAX_VALUE;
                }
                AbstractRequest.Builder<?> request = requestOpt.get();
                ClientRequest clientRequest = NetworkClient.this.newClientRequest(nodeConnectionId, request, now, true);
                NetworkClient.this.doSend(clientRequest, true, now);
                return NetworkClient.this.defaultRequestTimeoutMs;
            }
            this.stickyNode = null;
            if (NetworkClient.this.isAnyNodeConnecting()) {
                return NetworkClient.this.reconnectBackoffMs;
            }
            if (NetworkClient.this.connectionStates.canConnect(nodeConnectionId, now)) {
                NetworkClient.this.log.debug("Initialize connection to node {} for sending telemetry request", (Object)node);
                NetworkClient.this.initiateConnect(node, now);
                return NetworkClient.this.reconnectBackoffMs;
            }
            return Long.MAX_VALUE;
        }

        public void handleResponse(GetTelemetrySubscriptionsResponse response) {
            this.clientTelemetrySender.handleResponse(response);
        }

        public void handleResponse(PushTelemetryResponse response) {
            this.clientTelemetrySender.handleResponse(response);
        }

        public void handleFailedRequest(ApiKeys apiKey, KafkaException maybeFatalException) {
            if (apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS) {
                this.clientTelemetrySender.handleFailedGetTelemetrySubscriptionsRequest(maybeFatalException);
            } else if (apiKey == ApiKeys.PUSH_TELEMETRY) {
                this.clientTelemetrySender.handleFailedPushTelemetryRequest(maybeFatalException);
            } else {
                throw new IllegalStateException("Invalid api key for failed telemetry request");
            }
        }

        public void close() {
            try {
                this.clientTelemetrySender.close();
            }
            catch (Exception exception) {
                NetworkClient.this.log.error("Failed to close client telemetry sender", exception);
            }
        }
    }

    static class InFlightRequest {
        final RequestHeader header;
        final String destination;
        final RequestCompletionHandler callback;
        final boolean expectResponse;
        final AbstractRequest request;
        final boolean isInternalRequest;
        final Send send;
        final long sendTimeMs;
        final long createdTimeMs;
        final long requestTimeoutMs;
        long throttleTimeMs;

        public InFlightRequest(ClientRequest clientRequest, RequestHeader header, boolean isInternalRequest, AbstractRequest request, Send send, long sendTimeMs) {
            this(header, clientRequest.requestTimeoutMs(), clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), isInternalRequest, request, send, sendTimeMs);
        }

        public InFlightRequest(RequestHeader header, int requestTimeoutMs, long createdTimeMs, String destination, RequestCompletionHandler callback, boolean expectResponse, boolean isInternalRequest, AbstractRequest request, Send send, long sendTimeMs) {
            this.header = header;
            this.requestTimeoutMs = requestTimeoutMs;
            this.createdTimeMs = createdTimeMs;
            this.destination = destination;
            this.callback = callback;
            this.expectResponse = expectResponse;
            this.isInternalRequest = isInternalRequest;
            this.request = request;
            this.send = send;
            this.sendTimeMs = sendTimeMs;
        }

        public long timeElapsedSinceSendMs(long currentTimeMs) {
            return Math.max(0L, currentTimeMs - this.sendTimeMs);
        }

        public long throttleTimeMs() {
            return this.throttleTimeMs;
        }

        public long timeElapsedSinceCreateMs(long currentTimeMs) {
            return Math.max(0L, currentTimeMs - this.createdTimeMs);
        }

        public ClientResponse completed(AbstractResponse response, long timeMs) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, timeMs, false, null, null, response);
        }

        public ClientResponse timedOut(long timeMs) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, timeMs, true, true, null, null, null);
        }

        public ClientResponse disconnected(long timeMs) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, timeMs, true, null, null, null);
        }

        public String toString() {
            return "InFlightRequest(header=" + String.valueOf(this.header) + ", destination=" + this.destination + ", expectResponse=" + this.expectResponse + ", createdTimeMs=" + this.createdTimeMs + ", sendTimeMs=" + this.sendTimeMs + ", isInternalRequest=" + this.isInternalRequest + ", request=" + String.valueOf(this.request) + ", callback=" + String.valueOf(this.callback) + ", send=" + String.valueOf(this.send) + ")";
        }

        public void incrementThrottleTime(long throttleTimeMs) {
            this.throttleTimeMs = throttleTimeMs + this.throttleTimeMs;
        }
    }
}

