package org.pragmaticminds.crunch.execution;

import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.pragmaticminds.crunch.LoggingUtil;
import org.pragmaticminds.crunch.api.values.UntypedValues;
import org.pragmaticminds.crunch.execution.MRecordSource;
import org.pragmaticminds.crunch.serialization.JsonDeserializerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/pragmaticminds/crunch/execution/KafkaMRecordSource.class */
public class KafkaMRecordSource implements MRecordSource {
    static final long POLL_TIMEOUT_MS = 1000;
    private static final Logger logger = LoggerFactory.getLogger(KafkaMRecordSource.class);
    private transient KafkaConsumer<String, UntypedValues> consumer;
    private transient Iterator<ConsumerRecord<String, UntypedValues>> recordIterator;

    KafkaMRecordSource(KafkaConsumer<String, UntypedValues> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }

    public KafkaMRecordSource(String str, String str2, Collection<String> collection) {
        initialize(str, str2, collection, null, false);
    }

    public KafkaMRecordSource(String str, String str2, Collection<String> collection, boolean z) {
        initialize(str, str2, collection, null, z);
    }

    public KafkaMRecordSource(String str, String str2, Collection<String> collection, Map<String, Object> map) {
        initialize(str, str2, collection, map, false);
    }

    private void initialize(String str, String str2, Collection<String> collection, Map<String, Object> map, boolean z) {
        Map<String, Object> hashMap = map == null ? new HashMap() : map;
        hashMap.put("bootstrap.servers", str);
        hashMap.put("group.id", str2);
        hashMap.put("enable.auto.commit", "true");
        hashMap.put("auto.commit.interval.ms", "1000");
        if (z) {
            hashMap.put("auto.offset.reset", "earliest");
        }
        this.consumer = new KafkaConsumer<>(hashMap, new JsonDeserializerWrapper(String.class), new JsonDeserializerWrapper(UntypedValues.class));
        this.consumer.subscribe(collection);
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public UntypedValues m1get() {
        getMRecordsIfNoneAvailable();
        ConsumerRecord<String, UntypedValues> next = this.recordIterator.next();
        if (logger.isTraceEnabled() && next.offset() % LoggingUtil.getTraceLogReportCheckpoint() == 0) {
            logger.trace("Offset {} @ {}", Long.valueOf(next.offset()), Instant.ofEpochMilli(((UntypedValues) next.value()).getTimestamp()));
        }
        return (UntypedValues) next.value();
    }

    private void getMRecordsIfNoneAvailable() {
        while (true) {
            if (this.recordIterator != null && this.recordIterator.hasNext()) {
                return;
            } else {
                this.recordIterator = this.consumer.poll(POLL_TIMEOUT_MS).iterator();
            }
        }
    }

    public boolean hasRemaining() {
        return true;
    }

    public void init() {
    }

    public void close() {
        this.consumer.close();
    }

    public MRecordSource.Kind getKind() {
        return MRecordSource.Kind.INFINITE;
    }
}
