/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.ApiVersions;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.ClientResponse;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.Metadata;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.NodeApiVersions;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.StaleMetadataException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.IsolationLevel;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.Node;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.errors.RetriableException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.errors.TimeoutException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.protocol.ApiKeys;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.protocol.Errors;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.LogContext;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.Time;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

public class OffsetFetcher {
    private final Logger log;
    private final ConsumerMetadata metadata;
    private final SubscriptionState subscriptions;
    private final ConsumerNetworkClient client;
    private final Time time;
    private final long retryBackoffMs;
    private final long requestTimeoutMs;
    private final IsolationLevel isolationLevel;
    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference();
    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference();
    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
    private final ApiVersions apiVersions;
    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);

    public OffsetFetcher(LogContext logContext, ConsumerNetworkClient client, ConsumerMetadata metadata, SubscriptionState subscriptions, Time time, long retryBackoffMs, long requestTimeoutMs, IsolationLevel isolationLevel, ApiVersions apiVersions) {
        this.log = logContext.logger(this.getClass());
        this.time = time;
        this.client = client;
        this.metadata = metadata;
        this.subscriptions = subscriptions;
        this.retryBackoffMs = retryBackoffMs;
        this.requestTimeoutMs = requestTimeoutMs;
        this.isolationLevel = isolationLevel;
        this.apiVersions = apiVersions;
        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
    }

    private Long offsetResetStrategyTimestamp(TopicPartition partition) {
        OffsetResetStrategy strategy = this.subscriptions.resetStrategy(partition);
        if (strategy == OffsetResetStrategy.EARLIEST) {
            return -2L;
        }
        if (strategy == OffsetResetStrategy.LATEST) {
            return -1L;
        }
        return null;
    }

    private OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
        if (timestamp == -2L) {
            return OffsetResetStrategy.EARLIEST;
        }
        if (timestamp == -1L) {
            return OffsetResetStrategy.LATEST;
        }
        return null;
    }

    public void resetPositionsIfNeeded() {
        RuntimeException exception = this.cachedListOffsetsException.getAndSet(null);
        if (exception != null) {
            throw exception;
        }
        Set<TopicPartition> partitions = this.subscriptions.partitionsNeedingReset(this.time.milliseconds());
        if (partitions.isEmpty()) {
            return;
        }
        HashMap<TopicPartition, Long> offsetResetTimestamps = new HashMap<TopicPartition, Long>();
        for (TopicPartition partition : partitions) {
            Long timestamp = this.offsetResetStrategyTimestamp(partition);
            if (timestamp == null) continue;
            offsetResetTimestamps.put(partition, timestamp);
        }
        this.resetPositionsAsync(offsetResetTimestamps);
    }

    public void validatePositionsIfNeeded() {
        RuntimeException exception = this.cachedOffsetForLeaderException.getAndSet(null);
        if (exception != null) {
            throw exception;
        }
        this.validatePositionsOnMetadataChange();
        Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = this.subscriptions.partitionsNeedingValidation(this.time.milliseconds()).stream().filter(tp -> this.subscriptions.position((TopicPartition)tp) != null).collect(Collectors.toMap(Function.identity(), this.subscriptions::position));
        this.validatePositionsAsync(partitionsToValidate);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Timer timer) {
        this.metadata.addTransientTopics(this.topicsForPartitions(timestampsToSearch.keySet()));
        try {
            Map fetchedOffsets = this.fetchOffsetsByTimes(timestampsToSearch, timer, true).fetchedOffsets;
            HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<TopicPartition, OffsetAndTimestamp>(timestampsToSearch.size());
            for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
                offsetsByTimes.put(entry.getKey(), null);
            }
            for (Map.Entry<TopicPartition, Long> entry : fetchedOffsets.entrySet()) {
                ListOffsetData offsetData = (ListOffsetData)((Object)entry.getValue());
                offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(offsetData.offset, offsetData.timestamp, offsetData.leaderEpoch));
            }
            HashMap<TopicPartition, OffsetAndTimestamp> hashMap = offsetsByTimes;
            return hashMap;
        }
        finally {
            this.metadata.clearTransientTopics();
        }
    }

    private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch, Timer timer, boolean requireTimestamps) {
        final ListOffsetResult result = new ListOffsetResult();
        if (timestampsToSearch.isEmpty()) {
            return result;
        }
        final HashMap<TopicPartition, Long> remainingToSearch = new HashMap<TopicPartition, Long>(timestampsToSearch);
        do {
            final RequestFuture<ListOffsetResult> future = this.sendListOffsetsRequests(remainingToSearch, requireTimestamps);
            future.addListener(new RequestFutureListener<ListOffsetResult>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(ListOffsetResult value) {
                    RequestFuture requestFuture = future;
                    synchronized (requestFuture) {
                        result.fetchedOffsets.putAll(value.fetchedOffsets);
                        remainingToSearch.keySet().retainAll(value.partitionsToRetry);
                        for (Map.Entry entry : value.fetchedOffsets.entrySet()) {
                            TopicPartition partition = (TopicPartition)entry.getKey();
                            if (!OffsetFetcher.this.subscriptions.isAssigned(partition)) continue;
                            long offset = ((ListOffsetData)entry.getValue()).offset;
                            if (OffsetFetcher.this.isolationLevel == IsolationLevel.READ_COMMITTED) {
                                OffsetFetcher.this.log.trace("Updating last stable offset for partition {} to {}", (Object)partition, (Object)offset);
                                OffsetFetcher.this.subscriptions.updateLastStableOffset(partition, offset);
                                continue;
                            }
                            OffsetFetcher.this.log.trace("Updating high watermark for partition {} to {}", (Object)partition, (Object)offset);
                            OffsetFetcher.this.subscriptions.updateHighWatermark(partition, offset);
                        }
                    }
                }

                @Override
                public void onFailure(RuntimeException e) {
                    if (!(e instanceof RetriableException)) {
                        throw future.exception();
                    }
                }
            });
            if (timer.timeoutMs() == 0L) {
                return result;
            }
            this.client.poll(future, timer);
            if (!future.isDone()) break;
            if (remainingToSearch.isEmpty()) {
                return result;
            }
            this.client.awaitMetadataUpdate(timer);
        } while (timer.notExpired());
        throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + "ms");
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Timer timer) {
        return this.beginningOrEndOffset(partitions, -2L, timer);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Timer timer) {
        return this.beginningOrEndOffset(partitions, -1L, timer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions, long timestamp, Timer timer) {
        this.metadata.addTransientTopics(this.topicsForPartitions(partitions));
        try {
            Map<TopicPartition, Long> timestampsToSearch = partitions.stream().distinct().collect(Collectors.toMap(Function.identity(), tp -> timestamp));
            ListOffsetResult result = this.fetchOffsetsByTimes(timestampsToSearch, timer, false);
            Map<TopicPartition, Long> map = result.fetchedOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((ListOffsetData)entry.getValue()).offset));
            return map;
        }
        finally {
            this.metadata.clearTransientTopics();
        }
    }

    void resetPositionIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
        SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(offsetData.offset, Optional.empty(), this.metadata.currentLeader(partition));
        offsetData.leaderEpoch.ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(partition, (int)epoch));
        this.subscriptions.maybeSeekUnvalidated(partition, position, requestedResetStrategy);
    }

    private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = this.groupListOffsetRequests(partitionResetTimestamps, new HashSet<TopicPartition>());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
            Node node = entry.getKey();
            final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps = entry.getValue();
            this.subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), this.time.milliseconds() + this.requestTimeoutMs);
            RequestFuture<ListOffsetResult> future = this.sendListOffsetRequest(node, resetTimestamps, false);
            future.addListener(new RequestFutureListener<ListOffsetResult>(){

                @Override
                public void onSuccess(ListOffsetResult result) {
                    if (!result.partitionsToRetry.isEmpty()) {
                        OffsetFetcher.this.subscriptions.requestFailed(result.partitionsToRetry, OffsetFetcher.this.time.milliseconds() + OffsetFetcher.this.retryBackoffMs);
                        OffsetFetcher.this.metadata.requestUpdate();
                    }
                    for (Map.Entry fetchedOffset : result.fetchedOffsets.entrySet()) {
                        TopicPartition partition = (TopicPartition)fetchedOffset.getKey();
                        ListOffsetData offsetData = (ListOffsetData)fetchedOffset.getValue();
                        ListOffsetsRequestData.ListOffsetsPartition requestedReset = (ListOffsetsRequestData.ListOffsetsPartition)resetTimestamps.get(partition);
                        OffsetFetcher.this.resetPositionIfNeeded(partition, OffsetFetcher.this.timestampToOffsetResetStrategy(requestedReset.timestamp()), offsetData);
                    }
                }

                @Override
                public void onFailure(RuntimeException e) {
                    OffsetFetcher.this.subscriptions.requestFailed(resetTimestamps.keySet(), OffsetFetcher.this.time.milliseconds() + OffsetFetcher.this.retryBackoffMs);
                    OffsetFetcher.this.metadata.requestUpdate();
                    if (!(e instanceof RetriableException) && !OffsetFetcher.this.cachedListOffsetsException.compareAndSet(null, e)) {
                        OffsetFetcher.this.log.error("Discarding error in ListOffsetResponse because another error is pending", (Throwable)e);
                    }
                }
            });
        }
    }

    static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
        ApiVersionsResponseData.ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
        if (apiVersion == null) {
            return false;
        }
        return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
    }

    private void validatePositionsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) {
        Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped = this.regroupFetchPositionsByLeader(partitionsToValidate);
        long nextResetTimeMs = this.time.milliseconds() + this.requestTimeoutMs;
        regrouped.forEach((node, fetchPositions) -> {
            if (node.isEmpty()) {
                this.metadata.requestUpdate();
                return;
            }
            NodeApiVersions nodeApiVersions = this.apiVersions.get(node.idString());
            if (nodeApiVersions == null) {
                this.client.tryConnect((Node)node);
                return;
            }
            if (!OffsetFetcher.hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                this.log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not support the required protocol version (introduced in Kafka 2.3)", fetchPositions.keySet());
                for (TopicPartition partition : fetchPositions.keySet()) {
                    this.subscriptions.completeValidation(partition);
                }
                return;
            }
            this.subscriptions.setNextAllowedRetry(fetchPositions.keySet(), nextResetTimeMs);
            RequestFuture future = this.offsetsForLeaderEpochClient.sendAsyncRequest((Node)node, fetchPositions);
            future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>(){

                @Override
                public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
                    ArrayList truncations = new ArrayList();
                    if (!offsetsResult.partitionsToRetry().isEmpty()) {
                        OffsetFetcher.this.subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), OffsetFetcher.this.time.milliseconds() + OffsetFetcher.this.retryBackoffMs);
                        OffsetFetcher.this.metadata.requestUpdate();
                    }
                    offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
                        SubscriptionState.FetchPosition requestPosition = (SubscriptionState.FetchPosition)fetchPositions.get(topicPartition);
                        Optional<SubscriptionState.LogTruncation> truncationOpt = OffsetFetcher.this.subscriptions.maybeCompleteValidation((TopicPartition)topicPartition, requestPosition, (OffsetForLeaderEpochResponseData.EpochEndOffset)respEndOffset);
                        truncationOpt.ifPresent(truncations::add);
                    });
                    if (!truncations.isEmpty()) {
                        OffsetFetcher.this.maybeSetOffsetForLeaderException(OffsetFetcher.this.buildLogTruncationException(truncations));
                    }
                }

                @Override
                public void onFailure(RuntimeException e) {
                    OffsetFetcher.this.subscriptions.requestFailed(fetchPositions.keySet(), OffsetFetcher.this.time.milliseconds() + OffsetFetcher.this.retryBackoffMs);
                    OffsetFetcher.this.metadata.requestUpdate();
                    if (!(e instanceof RetriableException)) {
                        OffsetFetcher.this.maybeSetOffsetForLeaderException(e);
                    }
                }
            });
        });
    }

    private LogTruncationException buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
        HashMap<TopicPartition, OffsetAndMetadata> divergentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        HashMap<TopicPartition, Long> truncatedFetchOffsets = new HashMap<TopicPartition, Long>();
        for (SubscriptionState.LogTruncation truncation : truncations) {
            truncation.divergentOffsetOpt.ifPresent(divergentOffset -> divergentOffsets.put(truncation.topicPartition, (OffsetAndMetadata)divergentOffset));
            truncatedFetchOffsets.put(truncation.topicPartition, truncation.fetchPosition.offset);
        }
        return new LogTruncationException("Detected truncated partitions: " + truncations, truncatedFetchOffsets, divergentOffsets);
    }

    private void maybeSetOffsetForLeaderException(RuntimeException e) {
        if (!this.cachedOffsetForLeaderException.compareAndSet(null, e)) {
            this.log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", (Throwable)e);
        }
    }

    private RequestFuture<ListOffsetResult> sendListOffsetsRequests(Map<TopicPartition, Long> timestampsToSearch, boolean requireTimestamps) {
        final HashSet<TopicPartition> partitionsToRetry = new HashSet<TopicPartition>();
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode = this.groupListOffsetRequests(timestampsToSearch, partitionsToRetry);
        if (timestampsToSearchByNode.isEmpty()) {
            return RequestFuture.failure(new StaleMetadataException());
        }
        final RequestFuture<ListOffsetResult> listOffsetRequestsFuture = new RequestFuture<ListOffsetResult>();
        final HashMap fetchedTimestampOffsets = new HashMap();
        final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
            RequestFuture<ListOffsetResult> future = this.sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
            future.addListener(new RequestFutureListener<ListOffsetResult>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(ListOffsetResult partialResult) {
                    RequestFuture requestFuture = listOffsetRequestsFuture;
                    synchronized (requestFuture) {
                        fetchedTimestampOffsets.putAll(partialResult.fetchedOffsets);
                        partitionsToRetry.addAll(partialResult.partitionsToRetry);
                        if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone()) {
                            ListOffsetResult result = new ListOffsetResult(fetchedTimestampOffsets, partitionsToRetry);
                            listOffsetRequestsFuture.complete(result);
                        }
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(RuntimeException e) {
                    RequestFuture requestFuture = listOffsetRequestsFuture;
                    synchronized (requestFuture) {
                        if (!listOffsetRequestsFuture.isDone()) {
                            listOffsetRequestsFuture.raise(e);
                        }
                    }
                }
            });
        }
        return listOffsetRequestsFuture;
    }

    private Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests(Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition> partitionsToRetry) {
        HashMap<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> partitionDataMap = new HashMap<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>();
        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
            TopicPartition tp = entry.getKey();
            Long offset = entry.getValue();
            Metadata.LeaderAndEpoch leaderAndEpoch = this.metadata.currentLeader(tp);
            if (!leaderAndEpoch.leader.isPresent()) {
                this.log.debug("Leader for partition {} is unknown for fetching offset {}", (Object)tp, (Object)offset);
                this.metadata.requestUpdate();
                partitionsToRetry.add(tp);
                continue;
            }
            Node leader = leaderAndEpoch.leader.get();
            if (this.client.isUnavailable(leader)) {
                this.client.maybeThrowAuthFailure(leader);
                this.log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires", (Object)leader, (Object)tp);
                partitionsToRetry.add(tp);
                continue;
            }
            int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(-1);
            partitionDataMap.put(tp, new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(tp.partition()).setTimestamp(offset).setCurrentLeaderEpoch(currentLeaderEpoch));
        }
        return this.regroupPartitionMapByNode(partitionDataMap);
    }

    private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> timestampsToSearch, boolean requireTimestamp) {
        ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder.forConsumer(requireTimestamp, this.isolationLevel, false).setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
        this.log.debug("Sending ListOffsetRequest {} to broker {}", (Object)builder, (Object)node);
        return this.client.send(node, builder).compose(new RequestFutureAdapter<ClientResponse, ListOffsetResult>(){

            @Override
            public void onSuccess(ClientResponse response, RequestFuture<ListOffsetResult> future) {
                ListOffsetsResponse lor = (ListOffsetsResponse)response.responseBody();
                OffsetFetcher.this.log.trace("Received ListOffsetResponse {} from broker {}", (Object)lor, (Object)node);
                OffsetFetcher.this.handleListOffsetResponse(lor, future);
            }
        });
    }

    private void handleListOffsetResponse(ListOffsetsResponse listOffsetsResponse, RequestFuture<ListOffsetResult> future) {
        HashMap<TopicPartition, ListOffsetData> fetchedOffsets = new HashMap<TopicPartition, ListOffsetData>();
        HashSet<TopicPartition> partitionsToRetry = new HashSet<TopicPartition>();
        HashSet<String> unauthorizedTopics = new HashSet<String>();
        for (ListOffsetsResponseData.ListOffsetsTopicResponse topic : listOffsetsResponse.topics()) {
            block8: for (ListOffsetsResponseData.ListOffsetsPartitionResponse partition : topic.partitions()) {
                TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
                Errors error = Errors.forCode(partition.errorCode());
                switch (error) {
                    case NONE: {
                        if (!partition.oldStyleOffsets().isEmpty()) {
                            if (partition.oldStyleOffsets().size() > 1) {
                                future.raise(new IllegalStateException("Unexpected partitionData response of length " + partition.oldStyleOffsets().size()));
                                return;
                            }
                            long offset = partition.oldStyleOffsets().get(0);
                            this.log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", (Object)topicPartition, (Object)offset);
                            if (offset == -1L) continue block8;
                            ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
                            fetchedOffsets.put(topicPartition, offsetData);
                            break;
                        }
                        this.log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", new Object[]{topicPartition, partition.offset(), partition.timestamp()});
                        if (partition.offset() == -1L) continue block8;
                        Optional<Integer> leaderEpoch = partition.leaderEpoch() == -1 ? Optional.empty() : Optional.of(partition.leaderEpoch());
                        ListOffsetData offsetData = new ListOffsetData(partition.offset(), partition.timestamp(), leaderEpoch);
                        fetchedOffsets.put(topicPartition, offsetData);
                        break;
                    }
                    case UNSUPPORTED_FOR_MESSAGE_FORMAT: {
                        this.log.debug("Cannot search by timestamp for partition {} because the message format version is before 0.10.0", (Object)topicPartition);
                        break;
                    }
                    case NOT_LEADER_OR_FOLLOWER: 
                    case REPLICA_NOT_AVAILABLE: 
                    case KAFKA_STORAGE_ERROR: 
                    case OFFSET_NOT_AVAILABLE: 
                    case LEADER_NOT_AVAILABLE: 
                    case FENCED_LEADER_EPOCH: 
                    case UNKNOWN_LEADER_EPOCH: {
                        this.log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", (Object)topicPartition, (Object)error);
                        partitionsToRetry.add(topicPartition);
                        break;
                    }
                    case UNKNOWN_TOPIC_OR_PARTITION: {
                        this.log.warn("Received unknown topic or partition error in ListOffset request for partition {}", (Object)topicPartition);
                        partitionsToRetry.add(topicPartition);
                        break;
                    }
                    case TOPIC_AUTHORIZATION_FAILED: {
                        unauthorizedTopics.add(topicPartition.topic());
                        break;
                    }
                    default: {
                        this.log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.", (Object)topicPartition, (Object)error.message());
                        partitionsToRetry.add(topicPartition);
                    }
                }
            }
        }
        if (!unauthorizedTopics.isEmpty()) {
            future.raise(new TopicAuthorizationException(unauthorizedTopics));
        } else {
            future.complete(new ListOffsetResult(fetchedOffsets, partitionsToRetry));
        }
    }

    public void validatePositionsOnMetadataChange() {
        int newMetadataUpdateVersion = this.metadata.updateVersion();
        if (this.metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) {
            this.subscriptions.assignedPartitions().forEach(topicPartition -> {
                Metadata.LeaderAndEpoch leaderAndEpoch = this.metadata.currentLeader((TopicPartition)topicPartition);
                this.subscriptions.maybeValidatePositionForCurrentLeader(this.apiVersions, (TopicPartition)topicPartition, leaderAndEpoch);
            });
        }
    }

    private Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader(Map<TopicPartition, SubscriptionState.FetchPosition> partitionMap) {
        return partitionMap.entrySet().stream().filter(entry -> ((SubscriptionState.FetchPosition)entry.getValue()).currentLeader.leader.isPresent()).collect(Collectors.groupingBy(entry -> ((SubscriptionState.FetchPosition)entry.getValue()).currentLeader.leader.get(), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }

    private <T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(Map<TopicPartition, T> partitionMap) {
        return partitionMap.entrySet().stream().collect(Collectors.groupingBy(entry -> this.metadata.fetch().leaderFor((TopicPartition)entry.getKey()), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }

    private Set<String> topicsForPartitions(Collection<TopicPartition> partitions) {
        return partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
    }

    static class ListOffsetResult {
        private final Map<TopicPartition, ListOffsetData> fetchedOffsets;
        private final Set<TopicPartition> partitionsToRetry;

        ListOffsetResult(Map<TopicPartition, ListOffsetData> fetchedOffsets, Set<TopicPartition> partitionsNeedingRetry) {
            this.fetchedOffsets = fetchedOffsets;
            this.partitionsToRetry = partitionsNeedingRetry;
        }

        ListOffsetResult() {
            this.fetchedOffsets = new HashMap<TopicPartition, ListOffsetData>();
            this.partitionsToRetry = new HashSet<TopicPartition>();
        }
    }

    static class ListOffsetData {
        final long offset;
        final Long timestamp;
        final Optional<Integer> leaderEpoch;

        ListOffsetData(long offset, Long timestamp, Optional<Integer> leaderEpoch) {
            this.offset = offset;
            this.timestamp = timestamp;
            this.leaderEpoch = leaderEpoch;
        }
    }
}

