/*
 * Decompiled with CFR 0.152.
 */
package kafka.consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.FetchedDataChunk;
import kafka.consumer.PartitionTopicInfo;
import kafka.consumer.ZookeeperConsumerConnector$;
import kafka.message.Message;
import kafka.message.MessageSet$;
import kafka.utils.IteratorTemplate;
import org.apache.log4j.Logger;
import scala.ScalaObject;
import scala.collection.Iterator;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001E4\u0001\"\u0001\u0002\u0005\u0002\u0003\u0005\ta\u0002\u0002\u0011\u0007>t7/^7fe&#XM]1u_JT!a\u0001\u0003\u0002\u0011\r|gn];nKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001\u0001\u0002\u0006\t\u0004\u00131qQ\"\u0001\u0006\u000b\u0005-!\u0011!B;uS2\u001c\u0018BA\u0007\u000b\u0005AIE/\u001a:bi>\u0014H+Z7qY\u0006$X\r\u0005\u0002\u0010%5\t\u0001C\u0003\u0002\u0012\t\u00059Q.Z:tC\u001e,\u0017BA\n\u0011\u0005\u001diUm]:bO\u0016\u0004\"!\u0006\r\u000e\u0003YQ\u0011aF\u0001\u0006g\u000e\fG.Y\u0005\u00033Y\u00111bU2bY\u0006|%M[3di\"A1\u0004\u0001BC\u0002\u0013%A$A\u0004dQ\u0006tg.\u001a7\u0016\u0003u\u00012AH\u0013(\u001b\u0005y\"B\u0001\u0011\"\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003E\r\nA!\u001e;jY*\tA%\u0001\u0003kCZ\f\u0017B\u0001\u0014 \u00055\u0011En\\2lS:<\u0017+^3vKB\u0011\u0001&K\u0007\u0002\u0005%\u0011!F\u0001\u0002\u0011\r\u0016$8\r[3e\t\u0006$\u0018m\u00115v].D\u0001\u0002\f\u0001\u0003\u0002\u0003\u0006I!H\u0001\tG\"\fgN\\3mA!Aa\u0006\u0001B\u0001B\u0003%q&A\td_:\u001cX/\\3s)&lWm\\;u\u001bN\u0004\"!\u0006\u0019\n\u0005E2\"aA%oi\")1\u0007\u0001C\u0001i\u00051A(\u001b8jiz\"2!\u000e\u001c8!\tA\u0003\u0001C\u0003\u001ce\u0001\u0007Q\u0004C\u0003/e\u0001\u0007q\u0006C\u0004:\u0001\t\u0007I\u0011\u0002\u001e\u0002\r1|wmZ3s+\u0005Y\u0004C\u0001\u001fD\u001b\u0005i$B\u0001 @\u0003\u0015awn\u001a\u001bk\u0015\t\u0001\u0015)\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0005\u0006\u0019qN]4\n\u0005\u0011k$A\u0002'pO\u001e,'\u000f\u0003\u0004G\u0001\u0001\u0006IaO\u0001\bY><w-\u001a:!\u0011\u001dA\u0005\u00011A\u0005\n%\u000bqaY;se\u0016tG/F\u0001K!\rY5K\u0004\b\u0003\u0019Fs!!\u0014)\u000e\u00039S!a\u0014\u0004\u0002\rq\u0012xn\u001c;?\u0013\u00059\u0012B\u0001*\u0017\u0003\u001d\u0001\u0018mY6bO\u0016L!\u0001V+\u0003\u0011%#XM]1u_JT!A\u0015\f\t\u000f]\u0003\u0001\u0019!C\u00051\u0006Y1-\u001e:sK:$x\fJ3r)\tIF\f\u0005\u0002\u00165&\u00111L\u0006\u0002\u0005+:LG\u000fC\u0004^-\u0006\u0005\t\u0019\u0001&\u0002\u0007a$\u0013\u0007\u0003\u0004`\u0001\u0001\u0006KAS\u0001\tGV\u0014(/\u001a8uA!9\u0011\r\u0001a\u0001\n\u0013\u0011\u0017\u0001E2veJ,g\u000e\u001e+pa&\u001c\u0017J\u001c4p+\u0005\u0019\u0007C\u0001\u0015e\u0013\t)'A\u0001\nQCJ$\u0018\u000e^5p]R{\u0007/[2J]\u001a|\u0007bB4\u0001\u0001\u0004%I\u0001[\u0001\u0015GV\u0014(/\u001a8u)>\u0004\u0018nY%oM>|F%Z9\u0015\u0005eK\u0007bB/g\u0003\u0003\u0005\ra\u0019\u0005\u0007W\u0002\u0001\u000b\u0015B2\u0002#\r,(O]3oiR{\u0007/[2J]\u001a|\u0007\u0005C\u0003n\u0001\u0011\u0005c.\u0001\u0003oKb$H#\u0001\b\t\u000bA\u0004A\u0011\u00038\u0002\u00115\f7.\u001a(fqR\u0004")
public class ConsumerIterator
extends IteratorTemplate<Message>
implements ScalaObject {
    private final BlockingQueue<FetchedDataChunk> channel;
    private final int consumerTimeoutMs;
    private final Logger logger;
    private Iterator<Message> current;
    private PartitionTopicInfo currentTopicInfo;

    private BlockingQueue<FetchedDataChunk> channel() {
        return this.channel;
    }

    private Logger logger() {
        return this.logger;
    }

    private Iterator<Message> current() {
        return this.current;
    }

    private void current_$eq(Iterator<Message> iterator) {
        this.current = iterator;
    }

    private PartitionTopicInfo currentTopicInfo() {
        return this.currentTopicInfo;
    }

    private void currentTopicInfo_$eq(PartitionTopicInfo partitionTopicInfo) {
        this.currentTopicInfo = partitionTopicInfo;
    }

    /*
     * WARNING - void declaration
     */
    @Override
    public Message next() {
        void var1_1;
        Message message = (Message)super.next();
        this.currentTopicInfo().consumed(MessageSet$.MODULE$.entrySize(message));
        return var1_1;
    }

    @Override
    public Message makeNext() {
        if (this.current() == null || !this.current().hasNext()) {
            FetchedDataChunk found = null;
            if (this.consumerTimeoutMs < 0) {
                found = this.channel().take();
            } else {
                found = this.channel().poll(this.consumerTimeoutMs, TimeUnit.MILLISECONDS);
                if (found == null) {
                    this.logger().debug((Object)"Consumer iterator timing out..");
                    throw new ConsumerTimeoutException();
                }
            }
            if (found == ZookeeperConsumerConnector$.MODULE$.shutdownCommand()) {
                this.logger().debug((Object)"Received the shutdown command");
                this.channel().offer(found);
                return (Message)this.allDone();
            }
            this.currentTopicInfo_$eq(found.topicInfo());
            if (this.currentTopicInfo().getConsumeOffset() != found.fetchOffset()) {
                this.logger().error((Object)new StringBuilder().append((Object)"consumed offset: ").append((Object)BoxesRunTime.boxToLong((long)this.currentTopicInfo().getConsumeOffset())).append((Object)" doesn't match fetch offset: ").append((Object)BoxesRunTime.boxToLong((long)found.fetchOffset())).append((Object)" for ").append((Object)this.currentTopicInfo()).append((Object)"; consumer may lose data").toString());
                this.currentTopicInfo().resetConsumeOffset(found.fetchOffset());
            }
            this.current_$eq(found.messages().iterator());
        }
        return (Message)this.current().next();
    }

    public ConsumerIterator(BlockingQueue<FetchedDataChunk> channel, int consumerTimeoutMs) {
        this.channel = channel;
        this.consumerTimeoutMs = consumerTimeoutMs;
        this.logger = Logger.getLogger(ConsumerIterator.class);
        this.current = null;
        this.currentTopicInfo = null;
    }
}

