/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.math.BigDecimal;
import java.math.MathContext;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.CheckStopReadingFn;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaMetrics;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class ReadFromKafkaDoFn<@UnknownKeyFor K, @UnknownKeyFor V>
extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
    private static final @UnknownKeyFor @NonNull @Initialized AtomicLong FN_ID = new AtomicLong();
    private final @UnknownKeyFor @NonNull @Initialized long fnId = FN_ID.getAndIncrement();
    private final @Nullable @UnknownKeyFor @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> offsetConsumerConfig;
    private final @Nullable @UnknownKeyFor @Initialized CheckStopReadingFn checkStopReadingFn;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> consumerFactoryFn;
    private final @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> extractOutputTimestampFn;
    private final @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Instant, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant>> createWatermarkEstimatorFn;
    private final @Nullable @UnknownKeyFor @Initialized TimestampPolicyFactory<K, V> timestampPolicyFactory;
    private final @UnknownKeyFor @NonNull @Initialized BadRecordRouter badRecordRouter;
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag;
    private transient @Nullable @UnknownKeyFor @Initialized Deserializer<K> keyDeserializerInstance = null;
    private transient @Nullable @UnknownKeyFor @Initialized Deserializer<V> valueDeserializerInstance = null;
    private transient @Nullable @UnknownKeyFor @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaLatestOffsetEstimator> offsetEstimatorCache;
    private transient @Nullable @UnknownKeyFor @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaIOUtils.MovingAvg> avgRecordSizeCache;
    private static final @UnknownKeyFor @NonNull @Initialized long DEFAULT_KAFKA_POLL_TIMEOUT = 2L;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized long consumerPollingTimeout;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized DeserializerProvider<K> keyDeserializerProvider;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized DeserializerProvider<V> valueDeserializerProvider;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> consumerConfig;
    @VisibleForTesting
    static final @UnknownKeyFor @NonNull @Initialized String METRIC_NAMESPACE = "KafkaIOReader";
    @VisibleForTesting
    static final @UnknownKeyFor @NonNull @Initialized String RAW_SIZE_METRIC_PREFIX = "rawSize/";

    static <K, V> @UnknownKeyFor @NonNull @Initialized ReadFromKafkaDoFn<K, V> create(@UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
        if (transform.isBounded()) {
            return new Bounded<K, V>(transform, recordTag);
        }
        return new Unbounded<K, V>(transform, recordTag);
    }

    private ReadFromKafkaDoFn(@UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
        this.consumerConfig = transform.getConsumerConfig();
        this.offsetConsumerConfig = transform.getOffsetConsumerConfig();
        this.keyDeserializerProvider = (DeserializerProvider)Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider());
        this.valueDeserializerProvider = (DeserializerProvider)Preconditions.checkArgumentNotNull(transform.getValueDeserializerProvider());
        this.consumerFactoryFn = transform.getConsumerFactoryFn();
        this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn();
        this.createWatermarkEstimatorFn = transform.getCreateWatermarkEstimatorFn();
        this.timestampPolicyFactory = transform.getTimestampPolicyFactory();
        this.checkStopReadingFn = transform.getCheckStopReadingFn();
        this.badRecordRouter = transform.getBadRecordRouter();
        this.recordTag = recordTag;
        this.consumerPollingTimeout = transform.getConsumerPollingTimeout() > 0L ? transform.getConsumerPollingTimeout() : 2L;
    }

    @DoFn.GetInitialRestriction
    public @UnknownKeyFor @NonNull @Initialized OffsetRange initialRestriction(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor) {
        Map<String, Object> updatedConsumerConfig = this.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
        LOG.info("Creating Kafka consumer for initial restriction for {}", (Object)kafkaSourceDescriptor);
        try (Consumer offsetConsumer = (Consumer)this.consumerFactoryFn.apply(updatedConsumerConfig);){
            offsetConsumer.assign((Collection)ImmutableList.of((Object)partition));
            @Nullable Instant startReadTime = kafkaSourceDescriptor.getStartReadTime();
            long startOffset = kafkaSourceDescriptor.getStartReadOffset() != null ? kafkaSourceDescriptor.getStartReadOffset() : (startReadTime != null ? ConsumerSpEL.offsetForTime(offsetConsumer, partition, startReadTime) : offsetConsumer.position(partition));
            long endOffset = Long.MAX_VALUE;
            @Nullable Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime();
            if (kafkaSourceDescriptor.getStopReadOffset() != null) {
                endOffset = kafkaSourceDescriptor.getStopReadOffset();
            } else if (stopReadTime != null) {
                endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, stopReadTime);
            }
            new OffsetRange(startOffset, endOffset);
            Lineage.getSources().add("kafka", (Iterable)ImmutableList.of((Object)((String)updatedConsumerConfig.get("bootstrap.servers")), (Object)((String)MoreObjects.firstNonNull((Object)kafkaSourceDescriptor.getTopic(), (Object)partition.topic()))));
            OffsetRange offsetRange = new OffsetRange(startOffset, endOffset);
            return offsetRange;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public @UnknownKeyFor @NonNull @Initialized Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp @UnknownKeyFor @NonNull @Initialized Instant currentElementTimestamp) {
        return currentElementTimestamp;
    }

    @DoFn.NewWatermarkEstimator
    public @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState @UnknownKeyFor @NonNull @Initialized Instant watermarkEstimatorState) {
        SerializableFunction createWatermarkEstimatorFn = (SerializableFunction)Preconditions.checkStateNotNull(this.createWatermarkEstimatorFn);
        return (WatermarkEstimator)createWatermarkEstimatorFn.apply((Object)ReadFromKafkaDoFn.ensureTimestampWithinBounds(watermarkEstimatorState));
    }

    @DoFn.GetSize
    public @UnknownKeyFor @NonNull @Initialized double getSize(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange offsetRange) throws @UnknownKeyFor @NonNull @Initialized ExecutionException {
        LoadingCache avgRecordSizeCache = (LoadingCache)Preconditions.checkStateNotNull(this.avgRecordSizeCache);
        @Nullable KafkaIOUtils.MovingAvg avgRecordSize = (KafkaIOUtils.MovingAvg)avgRecordSizeCache.getIfPresent((Object)kafkaSourceDescriptor);
        double estimatedOffsetRange = this.restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
        return avgRecordSize == null ? estimatedOffsetRange : estimatedOffsetRange * avgRecordSize.get();
    }

    @DoFn.NewTracker
    public @UnknownKeyFor @NonNull @Initialized OffsetRangeTracker restrictionTracker(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange restriction) throws @UnknownKeyFor @NonNull @Initialized ExecutionException {
        if (restriction.getTo() < Long.MAX_VALUE) {
            return new OffsetRangeTracker(restriction);
        }
        LoadingCache offsetEstimatorCache = (LoadingCache)Preconditions.checkStateNotNull(this.offsetEstimatorCache);
        KafkaLatestOffsetEstimator offsetEstimator = (KafkaLatestOffsetEstimator)offsetEstimatorCache.get((Object)kafkaSourceDescriptor);
        return new GrowableOffsetRangeTracker(restriction.getFrom(), (GrowableOffsetRangeTracker.RangeEndEstimator)offsetEstimator);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @DoFn.ProcessElement
    public // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized Long> tracker, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver receiver) throws @UnknownKeyFor @NonNull @Initialized Exception {
        LoadingCache avgRecordSizeCache = (LoadingCache)Preconditions.checkStateNotNull(this.avgRecordSizeCache);
        LoadingCache offsetEstimatorCache = (LoadingCache)Preconditions.checkStateNotNull(this.offsetEstimatorCache);
        Deserializer keyDeserializerInstance = (Deserializer)Preconditions.checkStateNotNull(this.keyDeserializerInstance);
        Deserializer valueDeserializerInstance = (Deserializer)Preconditions.checkStateNotNull(this.valueDeserializerInstance);
        TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
        Distribution rawSizes = Metrics.distribution((String)METRIC_NAMESPACE, (String)(RAW_SIZE_METRIC_PREFIX + topicPartition.toString()));
        Gauge backlogBytes = Metrics.gauge((String)METRIC_NAMESPACE, (String)("rawSize/backlogBytes_" + topicPartition.toString()));
        if (this.checkStopReadingFn != null && ((Boolean)this.checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())).booleanValue()) {
            tracker.tryClaim((Object)(((OffsetRange)tracker.currentRestriction()).getTo() - 1L));
            return DoFn.ProcessContinuation.stop();
        }
        Map<String, Object> updatedConsumerConfig = this.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        TimestampPolicy timestampPolicy = null;
        if (this.timestampPolicyFactory != null) {
            timestampPolicy = this.timestampPolicyFactory.createTimestampPolicy(topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
        }
        LOG.info("Creating Kafka consumer for process continuation for {}", (Object)kafkaSourceDescriptor);
        try (Consumer consumer = (Consumer)this.consumerFactoryFn.apply(updatedConsumerConfig);){
            long startOffset;
            consumer.assign((Collection)ImmutableList.of((Object)kafkaSourceDescriptor.getTopicPartition()));
            long expectedOffset = startOffset = ((OffsetRange)tracker.currentRestriction()).getFrom();
            consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
            ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
            long skippedRecords = 0L;
            Stopwatch sw = Stopwatch.createStarted();
            KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
            try {
                while (true) {
                    KafkaIOUtils.MovingAvg avgRecordSize = (KafkaIOUtils.MovingAvg)avgRecordSizeCache.getUnchecked((Object)kafkaSourceDescriptor);
                    rawRecords = this.poll((Consumer<byte[], byte[]>)consumer, kafkaSourceDescriptor.getTopicPartition(), kafkaMetrics);
                    if (rawRecords.isEmpty()) {
                        Iterator iterator;
                        if (!this.topicPartitionExists(kafkaSourceDescriptor.getTopicPartition(), consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) {
                            iterator = DoFn.ProcessContinuation.stop();
                            return iterator;
                        }
                        if (timestampPolicy != null) {
                            this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                        }
                        iterator = DoFn.ProcessContinuation.resume();
                        return iterator;
                    }
                    for (ConsumerRecord rawRecord : rawRecords) {
                        DoFn.ProcessContinuation processContinuation;
                        if (rawRecord.offset() < startOffset) {
                            if (sw.elapsed().getSeconds() > 10L) {
                                LOG.error("The expected offset ({}) was not reached even after skipping consumed records for 10 seconds. The offset we could reach was {}. The processing of this bundle will be attempted at a later time.", (Object)expectedOffset, (Object)rawRecord.offset());
                                processContinuation = DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds((long)10L));
                                return processContinuation;
                            }
                            ++skippedRecords;
                            continue;
                        }
                        if (skippedRecords > 0L) {
                            LOG.warn("{} records were skipped due to seek returning an earlier position than requested position of {}", (Object)skippedRecords, (Object)expectedOffset);
                            skippedRecords = 0L;
                        }
                        if (!tracker.tryClaim((Object)rawRecord.offset())) {
                            processContinuation = DoFn.ProcessContinuation.stop();
                            return processContinuation;
                        }
                        try {
                            Instant outputTimestamp;
                            KafkaRecord kafkaRecord = new KafkaRecord(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), ConsumerSpEL.getRecordTimestamp((ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.getRecordTimestampType((ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null, ConsumerSpEL.deserializeKey(keyDeserializerInstance, (ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.deserializeValue(valueDeserializerInstance, (ConsumerRecord<byte[], byte[]>)rawRecord));
                            int recordSize = (rawRecord.key() == null ? 0 : ((byte[])rawRecord.key()).length) + (rawRecord.value() == null ? 0 : ((byte[])rawRecord.value()).length);
                            avgRecordSize.update(recordSize);
                            rawSizes.update((long)recordSize);
                            expectedOffset = rawRecord.offset() + 1L;
                            if (timestampPolicy != null) {
                                KafkaUnboundedReader.TimestampPolicyContext context = this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                                outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
                            } else {
                                Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
                                outputTimestamp = (Instant)this.extractOutputTimestampFn.apply(kafkaRecord);
                            }
                            receiver.get(this.recordTag).outputWithTimestamp((Object)KV.of((Object)kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
                        }
                        catch (SerializationException e) {
                            this.badRecordRouter.route(receiver, (Object)rawRecord, null, (Exception)((Object)e), "Failure deserializing Key or Value of Kakfa record reading from Kafka");
                            if (timestampPolicy == null) continue;
                            this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                        }
                    }
                    backlogBytes.set((long)(BigDecimal.valueOf((Long)Preconditions.checkStateNotNull((Object)((KafkaLatestOffsetEstimator)offsetEstimatorCache.get((Object)kafkaSourceDescriptor)).estimate())).subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128).doubleValue() * avgRecordSize.get()));
                    kafkaMetrics.updateBacklogBytes(kafkaSourceDescriptor.getTopic(), kafkaSourceDescriptor.getPartition(), (long)(BigDecimal.valueOf((Long)Preconditions.checkStateNotNull((Object)((KafkaLatestOffsetEstimator)offsetEstimatorCache.get((Object)kafkaSourceDescriptor)).estimate())).subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128).doubleValue() * avgRecordSize.get()));
                }
            }
            finally {
                kafkaMetrics.flushBufferedMetrics();
            }
        }
    }

    private @UnknownKeyFor @NonNull @Initialized boolean topicPartitionExists(@UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized PartitionInfo> partitionInfos) {
        return partitionInfos.stream().anyMatch(partitionInfo -> partitionInfo.partition() == topicPartition.partition());
    }

    private @UnknownKeyFor @NonNull @Initialized ConsumerRecords<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> poll(@UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> consumer, @UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition, @UnknownKeyFor @NonNull @Initialized KafkaMetrics kafkaMetrics) {
        ConsumerRecords rawRecords;
        Stopwatch sw = Stopwatch.createStarted();
        long previousPosition = -1L;
        java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout);
        java.time.Duration elapsed = java.time.Duration.ZERO;
        do {
            rawRecords = consumer.poll(timeout.minus(elapsed));
            elapsed = sw.elapsed();
            kafkaMetrics.updateSuccessfulRpcMetrics(topicPartition.topic(), java.time.Duration.ofMillis(elapsed.toMillis()));
            if (!rawRecords.isEmpty()) {
                return rawRecords;
            }
            if (previousPosition != (previousPosition = consumer.position(topicPartition))) continue;
            return rawRecords;
        } while (elapsed.toMillis() < timeout.toMillis());
        LOG.warn("No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", (Object)this.consumerPollingTimeout);
        return rawRecords;
    }

    private @UnknownKeyFor @NonNull @Initialized KafkaUnboundedReader.TimestampPolicyContext updateWatermarkManually(@UnknownKeyFor @NonNull @Initialized TimestampPolicy<K, V> timestampPolicy, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized Long> tracker) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((boolean)(watermarkEstimator instanceof ManualWatermarkEstimator));
        KafkaUnboundedReader.TimestampPolicyContext context = new KafkaUnboundedReader.TimestampPolicyContext((long)((RestrictionTracker.HasProgress)tracker).getProgress().getWorkRemaining(), Instant.now());
        ((ManualWatermarkEstimator)watermarkEstimator).setWatermark(ReadFromKafkaDoFn.ensureTimestampWithinBounds(timestampPolicy.getWatermark(context)));
        return context;
    }

    @DoFn.GetRestrictionCoder
    public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.Setup
    public void setup() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.avgRecordSizeCache = SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent(this.fnId, k -> CacheBuilder.newBuilder().maximumSize(1000L).build((CacheLoader)new CacheLoader<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg>(){

            public @UnknownKeyFor @NonNull @Initialized KafkaIOUtils.MovingAvg load(@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor) throws @UnknownKeyFor @NonNull @Initialized Exception {
                return new KafkaIOUtils.MovingAvg();
            }
        }));
        this.keyDeserializerInstance = this.keyDeserializerProvider.getDeserializer(this.consumerConfig, true);
        this.valueDeserializerInstance = this.valueDeserializerProvider.getDeserializer(this.consumerConfig, false);
        this.offsetEstimatorCache = SharedStateHolder.OFFSET_ESTIMATOR_CACHE.computeIfAbsent(this.fnId, k -> {
            ImmutableMap consumerConfig = ImmutableMap.copyOf(this.consumerConfig);
            @Nullable ImmutableMap offsetConsumerConfig = this.offsetConsumerConfig == null ? null : ImmutableMap.copyOf(this.offsetConsumerConfig);
            return CacheBuilder.newBuilder().weakValues().expireAfterAccess(1L, TimeUnit.MINUTES).build((CacheLoader)new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>((Map)consumerConfig, (Map)offsetConsumerConfig){
                final /* synthetic */ Map val$consumerConfig;
                final /* synthetic */ Map val$offsetConsumerConfig;
                {
                    this.val$consumerConfig = map;
                    this.val$offsetConsumerConfig = map2;
                }

                public @UnknownKeyFor @NonNull @Initialized KafkaLatestOffsetEstimator load(@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor) throws @UnknownKeyFor @NonNull @Initialized Exception {
                    LOG.info("Creating Kafka consumer for offset estimation for {}", (Object)kafkaSourceDescriptor);
                    TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
                    Map updatedConsumerConfig = ReadFromKafkaDoFn.this.overrideBootstrapServersConfig(this.val$consumerConfig, kafkaSourceDescriptor);
                    Consumer offsetConsumer = (Consumer)ReadFromKafkaDoFn.this.consumerFactoryFn.apply(KafkaIOUtils.getOffsetConsumerConfig("tracker-" + topicPartition, this.val$offsetConsumerConfig, updatedConsumerConfig));
                    return new KafkaLatestOffsetEstimator((Consumer<byte[], byte[]>)offsetConsumer, topicPartition);
                }
            });
        });
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.setup();
        }
    }

    @DoFn.Teardown
    public void teardown() throws @UnknownKeyFor @NonNull @Initialized Exception {
        LoadingCache avgRecordSizeCache = (LoadingCache)Preconditions.checkStateNotNull(this.avgRecordSizeCache);
        LoadingCache offsetEstimatorCache = (LoadingCache)Preconditions.checkStateNotNull(this.offsetEstimatorCache);
        try {
            if (this.valueDeserializerInstance != null) {
                Closeables.close(this.valueDeserializerInstance, (boolean)true);
                this.valueDeserializerInstance = null;
            }
            if (this.keyDeserializerInstance != null) {
                Closeables.close(this.keyDeserializerInstance, (boolean)true);
                this.keyDeserializerInstance = null;
            }
        }
        catch (Exception anyException) {
            LOG.warn("Fail to close resource during finishing bundle.", (Throwable)anyException);
        }
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.teardown();
        }
        avgRecordSizeCache.cleanUp();
        offsetEstimatorCache.cleanUp();
    }

    private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> overrideBootstrapServersConfig(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> currentConfig, @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor description) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((currentConfig.containsKey("bootstrap.servers") || description.getBootStrapServers() != null ? 1 : 0) != 0);
        HashMap<String, Object> config = new HashMap<String, Object>(currentConfig);
        if (description.getBootStrapServers() != null && description.getBootStrapServers().size() > 0) {
            config.put("bootstrap.servers", String.join((CharSequence)",", description.getBootStrapServers()));
        }
        return config;
    }

    private static @UnknownKeyFor @NonNull @Initialized Instant ensureTimestampWithinBounds(@UnknownKeyFor @NonNull @Initialized Instant timestamp) {
        if (timestamp.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (timestamp.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return timestamp;
    }

    private static class KafkaLatestOffsetEstimator
    implements GrowableOffsetRangeTracker.RangeEndEstimator {
        private final @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> offsetConsumer;
        private final @UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition;
        private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Long> memoizedBacklog;

        KafkaLatestOffsetEstimator(@UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> offsetConsumer, @UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition) {
            this.offsetConsumer = offsetConsumer;
            this.topicPartition = topicPartition;
            this.memoizedBacklog = Suppliers.memoizeWithExpiration(() -> {
                Consumer consumer = offsetConsumer;
                synchronized (consumer) {
                    return (Long)Preconditions.checkStateNotNull((Object)((Long)offsetConsumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition)), (String)"No end offset found for partition %s.", (Object)topicPartition);
                }
            }, (long)1L, (TimeUnit)TimeUnit.SECONDS);
        }

        protected void finalize() {
            try {
                Closeables.close(this.offsetConsumer, (boolean)true);
                LOG.info("Offset Estimator consumer was closed for {}", (Object)this.topicPartition);
            }
            catch (Exception anyException) {
                LOG.warn("Failed to close offset consumer for {}", (Object)this.topicPartition);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized long estimate() {
            return (Long)this.memoizedBacklog.get();
        }
    }

    private static final class SharedStateHolder {
        private static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaLatestOffsetEstimator>> OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<Long, LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>>();
        private static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaIOUtils.MovingAvg>> AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<Long, LoadingCache<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg>>();

        private SharedStateHolder() {
        }
    }

    @DoFn.BoundedPerElement
    private static class Bounded<@UnknownKeyFor K, @UnknownKeyFor V>
    extends ReadFromKafkaDoFn<K, V> {
        Bounded(@UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
            super(transform, recordTag);
        }
    }

    @DoFn.UnboundedPerElement
    private static class Unbounded<@UnknownKeyFor K, @UnknownKeyFor V>
    extends ReadFromKafkaDoFn<K, V> {
        Unbounded(@UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
            super(transform, recordTag);
        }
    }
}

