/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedSource;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaUnboundedReader<K, V>
extends UnboundedSource.UnboundedReader<KafkaRecord<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class);
    @VisibleForTesting
    static final String METRIC_NAMESPACE = "KafkaIOReader";
    @VisibleForTesting
    static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = "checkpointMarkCommitsEnqueued";
    private static final String CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC = "checkpointMarkCommitsSkipped";
    private final KafkaUnboundedSource<K, V> source;
    private final String name;
    private Consumer<byte[], byte[]> consumer;
    private final List<PartitionState<K, V>> partitionStates;
    private KafkaRecord<K, V> curRecord;
    private Instant curTimestamp;
    private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();
    private Deserializer<K> keyDeserializerInstance = null;
    private Deserializer<V> valueDeserializerInstance = null;
    private final Counter elementsRead = SourceMetrics.elementsRead();
    private final Counter bytesRead = SourceMetrics.bytesRead();
    private final Counter elementsReadBySplit;
    private final Counter bytesReadBySplit;
    private final Gauge backlogBytesOfSplit;
    private final Gauge backlogElementsOfSplit;
    private final Counter checkpointMarkCommitsEnqueued = Metrics.counter((String)"KafkaIOReader", (String)"checkpointMarkCommitsEnqueued");
    private final Counter checkpointMarkCommitsSkipped = Metrics.counter((String)"KafkaIOReader", (String)"checkpointMarkCommitsSkipped");
    private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis((long)1000L);
    private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis((long)10L);
    private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis((long)100L);
    private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
    private AtomicReference<Exception> consumerPollException = new AtomicReference();
    private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue = new SynchronousQueue();
    private AtomicReference<KafkaCheckpointMark> finalizedCheckpointMark = new AtomicReference();
    private AtomicBoolean closed = new AtomicBoolean(false);
    private Consumer<byte[], byte[]> offsetConsumer;
    private final ScheduledExecutorService offsetFetcherThread = Executors.newSingleThreadScheduledExecutor();
    private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1;
    private static final long UNINITIALIZED_OFFSET = -1L;
    private transient ConsumerSpEL consumerSpEL = new ConsumerSpEL();
    private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

    public boolean start() throws IOException {
        int defaultPartitionInitTimeout = 60000;
        int kafkaRequestTimeoutMultiple = 2;
        KafkaIO.Read<K, V> spec = this.source.getSpec();
        this.consumer = (Consumer)spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
        this.consumerSpEL.evaluateAssign(this.consumer, spec.getTopicPartitions());
        try {
            this.keyDeserializerInstance = spec.getKeyDeserializer().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            this.valueDeserializerInstance = spec.getValueDeserializer().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new IOException("Could not instantiate deserializers", e);
        }
        this.keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
        this.valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
        for (PartitionState<K, V> pState : this.partitionStates) {
            Future<?> future = this.consumerPollThread.submit(() -> this.setupInitialOffset(pState));
            try {
                Integer reqTimeout = (Integer)spec.getConsumerConfig().get("request.timeout.ms");
                future.get(reqTimeout != null ? (long)(2 * reqTimeout) : 60000L, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                this.consumer.wakeup();
                String msg = String.format("%s: Timeout while initializing partition '%s'. Kafka client may not be able to connect to servers.", new Object[]{this, ((PartitionState)pState).topicPartition});
                LOG.error("{}", (Object)msg);
                throw new IOException(msg);
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            LOG.info("{}: reading from {} starting at offset {}", new Object[]{this.name, ((PartitionState)pState).topicPartition, ((PartitionState)pState).nextOffset});
        }
        this.consumerPollThread.submit(this::consumerPollLoop);
        Object groupId = spec.getConsumerConfig().get("group.id");
        String offsetGroupId = String.format("%s_offset_consumer_%d_%s", this.name, new Random().nextInt(Integer.MAX_VALUE), groupId == null ? "none" : groupId);
        HashMap<String, Object> offsetConsumerConfig = new HashMap<String, Object>(spec.getConsumerConfig());
        offsetConsumerConfig.put("group.id", offsetGroupId);
        offsetConsumerConfig.put("enable.auto.commit", false);
        offsetConsumerConfig.put("isolation.level", "read_uncommitted");
        this.offsetConsumer = (Consumer)spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
        this.consumerSpEL.evaluateAssign(this.offsetConsumer, spec.getTopicPartitions());
        this.updateLatestOffsets();
        this.offsetFetcherThread.scheduleAtFixedRate(this::updateLatestOffsets, 0L, 1L, TimeUnit.SECONDS);
        return this.advance();
    }

    public boolean advance() throws IOException {
        while (true) {
            if (this.curBatch.hasNext()) {
                PartitionState<K, V> pState = this.curBatch.next();
                if (!((PartitionState)pState).recordIter.hasNext()) {
                    ((PartitionState)pState).recordIter = Collections.emptyIterator();
                    this.curBatch.remove();
                    continue;
                }
                this.elementsRead.inc();
                this.elementsReadBySplit.inc();
                ConsumerRecord rawRecord = (ConsumerRecord)((PartitionState)pState).recordIter.next();
                long expected = ((PartitionState)pState).nextOffset;
                long offset = rawRecord.offset();
                if (offset < expected) {
                    LOG.warn("{}: ignoring already consumed offset {} for {}", new Object[]{this, offset, ((PartitionState)pState).topicPartition});
                    continue;
                }
                long offsetGap = offset - expected;
                if (this.curRecord == null) {
                    LOG.info("{}: first record offset {}", (Object)this.name, (Object)offset);
                    offsetGap = 0L;
                }
                KafkaRecord<Object, Object> record = new KafkaRecord<Object, Object>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), this.consumerSpEL.getRecordTimestamp((ConsumerRecord<byte[], byte[]>)rawRecord), this.consumerSpEL.getRecordTimestampType((ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.hasHeaders ? rawRecord.headers() : null, this.keyDeserializerInstance.deserialize(rawRecord.topic(), (byte[])rawRecord.key()), this.valueDeserializerInstance.deserialize(rawRecord.topic(), (byte[])rawRecord.value()));
                this.curTimestamp = ((PartitionState)pState).timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
                this.curRecord = record;
                int recordSize = (rawRecord.key() == null ? 0 : ((byte[])rawRecord.key()).length) + (rawRecord.value() == null ? 0 : ((byte[])rawRecord.value()).length);
                pState.recordConsumed(offset, recordSize, offsetGap);
                this.bytesRead.inc((long)recordSize);
                this.bytesReadBySplit.inc((long)recordSize);
                return true;
            }
            this.nextBatch();
            if (!this.curBatch.hasNext()) break;
        }
        return false;
    }

    public Instant getWatermark() {
        if (this.source.getSpec().getWatermarkFn() != null) {
            if (this.curRecord == null) {
                LOG.debug("{}: getWatermark() : no records have been read yet.", (Object)this.name);
                return initialWatermark;
            }
            return (Instant)this.source.getSpec().getWatermarkFn().apply(this.curRecord);
        }
        return this.partitionStates.stream().map(PartitionState::updateAndGetWatermark).min(Comparator.naturalOrder()).get();
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        this.reportBacklog();
        return new KafkaCheckpointMark(this.partitionStates.stream().map(p -> new KafkaCheckpointMark.PartitionMark(((PartitionState)p).topicPartition.topic(), ((PartitionState)p).topicPartition.partition(), ((PartitionState)p).nextOffset, ((PartitionState)p).lastWatermark.getMillis())).collect(Collectors.toList()), this.source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) : Optional.empty());
    }

    public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
        return this.source;
    }

    public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
        return this.curRecord;
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        return this.curTimestamp;
    }

    public long getSplitBacklogBytes() {
        long backlogBytes = 0L;
        for (PartitionState<K, V> p : this.partitionStates) {
            long pBacklog = p.approxBacklogInBytes();
            if (pBacklog == -1L) {
                return -1L;
            }
            backlogBytes += pBacklog;
        }
        return backlogBytes;
    }

    public String toString() {
        return this.name;
    }

    KafkaUnboundedReader(KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
        this.source = source;
        this.name = "Reader-" + source.getId();
        List<TopicPartition> partitions = source.getSpec().getTopicPartitions();
        ArrayList<PartitionState<K, V>> states = new ArrayList<PartitionState<K, V>>(partitions.size());
        if (checkpointMark != null) {
            Preconditions.checkState((checkpointMark.getPartitions().size() == partitions.size() ? 1 : 0) != 0, (Object)"checkPointMark and assignedPartitions should match");
        }
        for (int i = 0; i < partitions.size(); ++i) {
            TopicPartition tp = partitions.get(i);
            long nextOffset = -1L;
            Optional<Instant> prevWatermark = Optional.empty();
            if (checkpointMark != null) {
                KafkaCheckpointMark.PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
                TopicPartition partition = new TopicPartition(ckptMark.getTopic(), ckptMark.getPartition());
                Preconditions.checkState((boolean)partition.equals((Object)tp), (String)"checkpointed partition %s and assigned partition %s don't match", (Object)partition, (Object)tp);
                nextOffset = ckptMark.getNextOffset();
                prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis()));
            }
            states.add(new PartitionState<K, V>(tp, nextOffset, source.getSpec().getTimestampPolicyFactory().createTimestampPolicy(tp, prevWatermark)));
        }
        this.partitionStates = ImmutableList.copyOf(states);
        String splitId = String.valueOf(source.getId());
        this.elementsReadBySplit = SourceMetrics.elementsReadBySplit((String)splitId);
        this.bytesReadBySplit = SourceMetrics.bytesReadBySplit((String)splitId);
        this.backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit((String)splitId);
        this.backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit((String)splitId);
    }

    private void consumerPollLoop() {
        try {
            ConsumerRecords records = ConsumerRecords.empty();
            while (!this.closed.get()) {
                try {
                    KafkaCheckpointMark checkpointMark;
                    if (records.isEmpty()) {
                        records = this.consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    } else if (this.availableRecordsQueue.offer((ConsumerRecords<byte[], byte[]>)records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
                        records = ConsumerRecords.empty();
                    }
                    if ((checkpointMark = (KafkaCheckpointMark)this.finalizedCheckpointMark.getAndSet(null)) == null) continue;
                    this.commitCheckpointMark(checkpointMark);
                }
                catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", (Object)this, (Object)e);
                    break;
                }
                catch (WakeupException e) {
                    // empty catch block
                    break;
                }
            }
            LOG.info("{}: Returning from consumer pool loop", (Object)this);
        }
        catch (Exception e) {
            LOG.error("{}: Exception while reading from Kafka", (Object)this, (Object)e);
            this.consumerPollException.set(e);
            throw e;
        }
    }

    private void commitCheckpointMark(KafkaCheckpointMark checkpointMark) {
        LOG.debug("{}: Committing finalized checkpoint {}", (Object)this, (Object)checkpointMark);
        this.consumer.commitSync(checkpointMark.getPartitions().stream().filter(p -> p.getNextOffset() != -1L).collect(Collectors.toMap(p -> new TopicPartition(p.getTopic(), p.getPartition()), p -> new OffsetAndMetadata(p.getNextOffset()))));
    }

    void finalizeCheckpointMarkAsync(KafkaCheckpointMark checkpointMark) {
        if (this.finalizedCheckpointMark.getAndSet(checkpointMark) != null) {
            this.checkpointMarkCommitsSkipped.inc();
        }
        this.checkpointMarkCommitsEnqueued.inc();
    }

    private void nextBatch() throws IOException {
        ConsumerRecords<byte[], byte[]> records;
        this.curBatch = Collections.emptyIterator();
        try {
            records = this.availableRecordsQueue.poll(RECORDS_DEQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("{}: Unexpected", (Object)this, (Object)e);
            return;
        }
        if (records == null) {
            if (this.consumerPollException.get() != null) {
                throw new IOException("Exception while reading from Kafka", this.consumerPollException.get());
            }
            return;
        }
        this.partitionStates.forEach(p -> ((PartitionState)p).recordIter = records.records(((PartitionState)p).topicPartition).iterator());
        this.curBatch = Iterators.cycle(new ArrayList<PartitionState<K, V>>(this.partitionStates));
    }

    private void setupInitialOffset(PartitionState pState) {
        KafkaIO.Read<K, V> spec = this.source.getSpec();
        if (pState.nextOffset != -1L) {
            this.consumer.seek(pState.topicPartition, pState.nextOffset);
        } else {
            Instant startReadTime = spec.getStartReadTime();
            if (startReadTime != null) {
                pState.nextOffset = this.consumerSpEL.offsetForTime(this.consumer, pState.topicPartition, spec.getStartReadTime());
                this.consumer.seek(pState.topicPartition, pState.nextOffset);
            } else {
                pState.nextOffset = this.consumer.position(pState.topicPartition);
            }
        }
    }

    private void updateLatestOffsets() {
        for (PartitionState<K, V> p : this.partitionStates) {
            try {
                Instant fetchTime = Instant.now();
                this.consumerSpEL.evaluateSeek2End(this.offsetConsumer, ((PartitionState)p).topicPartition);
                long offset = this.offsetConsumer.position(((PartitionState)p).topicPartition);
                p.setLatestOffset(offset, fetchTime);
            }
            catch (Exception e) {
                if (this.closed.get()) break;
                LOG.warn("{}: exception while fetching latest offset for partition {}. will be retried.", new Object[]{this, ((PartitionState)p).topicPartition, e});
            }
        }
        LOG.debug("{}:  backlog {}", (Object)this, (Object)this.getSplitBacklogBytes());
    }

    private void reportBacklog() {
        long splitBacklogBytes = this.getSplitBacklogBytes();
        if (splitBacklogBytes < 0L) {
            splitBacklogBytes = -1L;
        }
        this.backlogBytesOfSplit.set(splitBacklogBytes);
        long splitBacklogMessages = this.getSplitBacklogMessageCount();
        if (splitBacklogMessages < 0L) {
            splitBacklogMessages = -1L;
        }
        this.backlogElementsOfSplit.set(splitBacklogMessages);
    }

    private long getSplitBacklogMessageCount() {
        long backlogCount = 0L;
        for (PartitionState<K, V> p : this.partitionStates) {
            long pBacklog = p.backlogMessageCount();
            if (pBacklog == -1L) {
                return -1L;
            }
            backlogCount += pBacklog;
        }
        return backlogCount;
    }

    public void close() throws IOException {
        this.closed.set(true);
        this.consumerPollThread.shutdown();
        this.offsetFetcherThread.shutdown();
        boolean isShutdown = false;
        while (!isShutdown) {
            if (this.consumer != null) {
                this.consumer.wakeup();
            }
            if (this.offsetConsumer != null) {
                this.offsetConsumer.wakeup();
            }
            this.availableRecordsQueue.poll();
            try {
                isShutdown = this.consumerPollThread.awaitTermination(10L, TimeUnit.SECONDS) && this.offsetFetcherThread.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            if (isShutdown) continue;
            LOG.warn("An internal thread is taking a long time to shutdown. will retry.");
        }
        Closeables.close(this.keyDeserializerInstance, (boolean)true);
        Closeables.close(this.valueDeserializerInstance, (boolean)true);
        Closeables.close(this.offsetConsumer, (boolean)true);
        Closeables.close(this.consumer, (boolean)true);
    }

    private static class PartitionState<K, V> {
        private final TopicPartition topicPartition;
        private long nextOffset;
        private long latestOffset;
        private Instant latestOffsetFetchTime;
        private Instant lastWatermark;
        private final TimestampPolicy<K, V> timestampPolicy;
        private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
        private MovingAvg avgRecordSize = new MovingAvg();
        private MovingAvg avgOffsetGap = new MovingAvg();

        PartitionState(TopicPartition partition, long nextOffset, TimestampPolicy<K, V> timestampPolicy) {
            this.topicPartition = partition;
            this.nextOffset = nextOffset;
            this.latestOffset = -1L;
            this.latestOffsetFetchTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
            this.lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
            this.timestampPolicy = timestampPolicy;
        }

        void recordConsumed(long offset, int size, long offsetGap) {
            this.nextOffset = offset + 1L;
            this.avgRecordSize.update(size);
            this.avgOffsetGap.update(offsetGap);
        }

        synchronized void setLatestOffset(long latestOffset, Instant fetchTime) {
            this.latestOffset = latestOffset;
            this.latestOffsetFetchTime = fetchTime;
            LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg record size {})", new Object[]{this, this.topicPartition, latestOffset, this.nextOffset, this.avgRecordSize});
        }

        synchronized long approxBacklogInBytes() {
            long backlogMessageCount = this.backlogMessageCount();
            if (backlogMessageCount == -1L) {
                return -1L;
            }
            return (long)((double)backlogMessageCount * this.avgRecordSize.get());
        }

        synchronized long backlogMessageCount() {
            if (this.latestOffset < 0L || this.nextOffset < 0L) {
                return -1L;
            }
            double remaining = (double)(this.latestOffset - this.nextOffset) / (1.0 + this.avgOffsetGap.get());
            return Math.max(0L, (long)Math.ceil(remaining));
        }

        synchronized TimestampPolicyContext mkTimestampPolicyContext() {
            return new TimestampPolicyContext(this.backlogMessageCount(), this.latestOffsetFetchTime);
        }

        Instant updateAndGetWatermark() {
            this.lastWatermark = this.timestampPolicy.getWatermark(this.mkTimestampPolicyContext());
            return this.lastWatermark;
        }
    }

    private static class TimestampPolicyContext
    extends TimestampPolicy.PartitionContext {
        private final long messageBacklog;
        private final Instant backlogCheckTime;

        TimestampPolicyContext(long messageBacklog, Instant backlogCheckTime) {
            this.messageBacklog = messageBacklog;
            this.backlogCheckTime = backlogCheckTime;
        }

        @Override
        public long getMessageBacklog() {
            return this.messageBacklog;
        }

        @Override
        public Instant getBacklogCheckTime() {
            return this.backlogCheckTime;
        }
    }

    private static class MovingAvg {
        private static final int MOVING_AVG_WINDOW = 1000;
        private double avg = 0.0;
        private long numUpdates = 0L;

        private MovingAvg() {
        }

        void update(double quantity) {
            ++this.numUpdates;
            this.avg += (quantity - this.avg) / (double)Math.min(1000L, this.numUpdates);
        }

        double get() {
            return this.avg;
        }
    }
}

