/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.kafka;

import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.kafka.KafkaInputSplit;
import org.apache.hadoop.hive.kafka.KafkaRecordIterator;
import org.apache.hadoop.hive.kafka.KafkaTableProperties;
import org.apache.hadoop.hive.kafka.KafkaUtils;
import org.apache.hadoop.hive.kafka.KafkaWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafkaesqueesque.clients.consumer.ConsumerRecord;
import org.apache.kafkaesqueesque.clients.consumer.KafkaConsumer;
import org.apache.kafkaesqueesque.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordReader
extends RecordReader<NullWritable, KafkaWritable>
implements org.apache.hadoop.mapred.RecordReader<NullWritable, KafkaWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
    private KafkaConsumer<byte[], byte[]> consumer = null;
    private Configuration config = null;
    private KafkaWritable currentWritableValue;
    private Iterator<ConsumerRecord<byte[], byte[]>> recordsCursor = null;
    private long totalNumberRecords = 0L;
    private long consumedRecords = 0L;
    private long readBytes = 0L;
    private volatile boolean started = false;

    public KafkaRecordReader() {
    }

    private void initConsumer() {
        if (this.consumer == null) {
            LOG.info("Initializing Kafka Consumer");
            Properties properties = KafkaUtils.consumerProperties(this.config);
            String brokerString = properties.getProperty("bootstrap.servers");
            Preconditions.checkNotNull((Object)brokerString, (Object)"broker end point can not be null");
            LOG.info("Starting Consumer with Kafka broker string [{}]", (Object)brokerString);
            this.consumer = new KafkaConsumer(properties);
        }
    }

    public KafkaRecordReader(KafkaInputSplit inputSplit, Configuration jobConf) {
        this.initialize(inputSplit, jobConf);
    }

    private synchronized void initialize(KafkaInputSplit inputSplit, Configuration jobConf) {
        if (!this.started) {
            this.config = jobConf;
            long startOffset = inputSplit.getStartOffset();
            long endOffset = inputSplit.getEndOffset();
            TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
            Preconditions.checkState((startOffset >= 0L && startOffset <= endOffset ? 1 : 0) != 0, (String)"Start [%s] has to be positive and Less than or equal to End [%s]", (long)startOffset, (long)endOffset);
            this.totalNumberRecords += endOffset - startOffset;
            this.initConsumer();
            long pollTimeout = this.config.getLong(KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName(), -1L);
            LOG.debug("Consumer poll timeout [{}] ms", (Object)pollTimeout);
            this.recordsCursor = startOffset == endOffset ? new EmptyIterator() : new KafkaRecordIterator(this.consumer, topicPartition, startOffset, endOffset, pollTimeout);
            this.started = true;
        }
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
        this.initialize((KafkaInputSplit)inputSplit, context.getConfiguration());
    }

    public boolean next(NullWritable nullWritable, KafkaWritable bytesWritable) {
        if (this.started && this.recordsCursor.hasNext()) {
            ConsumerRecord<byte[], byte[]> record = this.recordsCursor.next();
            bytesWritable.set(record);
            ++this.consumedRecords;
            this.readBytes += (long)record.serializedValueSize();
            return true;
        }
        return false;
    }

    public NullWritable createKey() {
        return NullWritable.get();
    }

    public KafkaWritable createValue() {
        return new KafkaWritable();
    }

    public long getPos() {
        return -1L;
    }

    public boolean nextKeyValue() {
        this.currentWritableValue = new KafkaWritable();
        if (this.next(NullWritable.get(), this.currentWritableValue)) {
            return true;
        }
        this.currentWritableValue = null;
        return false;
    }

    public NullWritable getCurrentKey() {
        return NullWritable.get();
    }

    public KafkaWritable getCurrentValue() {
        return (KafkaWritable)Preconditions.checkNotNull((Object)this.currentWritableValue);
    }

    public float getProgress() {
        if (this.consumedRecords == 0L) {
            return 0.0f;
        }
        if (this.consumedRecords >= this.totalNumberRecords) {
            return 1.0f;
        }
        return (float)this.consumedRecords * 1.0f / (float)this.totalNumberRecords;
    }

    public void close() {
        LOG.trace("total read bytes [{}]", (Object)this.readBytes);
        if (this.consumer != null) {
            this.consumer.wakeup();
            this.consumer.close();
        }
    }

    static final class EmptyIterator
    implements Iterator<ConsumerRecord<byte[], byte[]>> {
        EmptyIterator() {
        }

        @Override
        public boolean hasNext() {
            return false;
        }

        @Override
        public ConsumerRecord<byte[], byte[]> next() {
            throw new IllegalStateException("this is an empty iterator");
        }
    }
}

