/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.kafka;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HighLevelConsumer<K, V>
extends AbstractIdleService {
    private static final Logger log = LoggerFactory.getLogger(HighLevelConsumer.class);
    public static final String CONSUMER_CLIENT_FACTORY_CLASS_KEY = "kafka.consumerClientClassFactory";
    private static final String DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS = "org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory";
    public static final String ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
    public static final boolean DEFAULT_AUTO_COMMIT_VALUE = false;
    public static final String GROUP_ID_KEY = "group.id";
    private static final String DEFAULT_GROUP_ID = "KafkaJobSpecMonitor";
    public static final String OFFSET_COMMIT_NUM_RECORDS_THRESHOLD_KEY = "offsets.commit.num.records.threshold";
    public static final int DEFAULT_OFFSET_COMMIT_NUM_RECORDS_THRESHOLD = 100;
    public static final String OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY = "offsets.commit.time.threshold.secs";
    public static final int DEFAULT_OFFSET_COMMIT_TIME_THRESHOLD_SECS = 10;
    protected final String topic;
    protected final Config config;
    private final int numThreads;
    private MetricContext metricContext;
    private Counter messagesRead;
    private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
    private final ScheduledExecutorService consumerExecutor;
    private final ExecutorService queueExecutor;
    private final BlockingQueue[] queues;
    private final AtomicInteger recordsProcessed;
    private final Map<KafkaPartition, Long> partitionOffsetsToCommit;
    private final boolean enableAutoCommit;
    private final int offsetsCommitNumRecordsThreshold;
    private final int offsetsCommitTimeThresholdSecs;
    private long lastCommitTime = System.currentTimeMillis();
    private static final Config FALLBACK = ConfigFactory.parseMap((Map)ImmutableMap.builder().put((Object)"group.id", (Object)"KafkaJobSpecMonitor").put((Object)"enable.auto.commit", (Object)false).put((Object)"kafka.consumerClientClassFactory", (Object)"org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory").put((Object)"offsets.commit.num.records.threshold", (Object)100).put((Object)"offsets.commit.time.threshold.secs", (Object)10).build());

    public HighLevelConsumer(String topic, Config config, int numThreads) {
        this.topic = topic;
        this.numThreads = numThreads;
        this.config = config.withFallback((ConfigMergeable)FALLBACK);
        this.gobblinKafkaConsumerClient = this.createConsumerClient(this.config);
        this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
                HighLevelConsumer.this.copyAndCommit();
                HighLevelConsumer.this.partitionOffsetsToCommit.clear();
            }

            public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
            }
        });
        this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)log), (Optional)Optional.of((Object)"HighLevelConsumerThread")));
        this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)log), (Optional)Optional.of((Object)"QueueProcessor-%d")));
        this.queues = new LinkedBlockingQueue[numThreads];
        for (int i = 0; i < this.queues.length; ++i) {
            this.queues[i] = new LinkedBlockingQueue();
        }
        this.recordsProcessed = new AtomicInteger(0);
        this.partitionOffsetsToCommit = new ConcurrentHashMap<KafkaPartition, Long>();
        this.enableAutoCommit = ConfigUtils.getBoolean((Config)config, (String)ENABLE_AUTO_COMMIT_KEY, (boolean)false);
        this.offsetsCommitNumRecordsThreshold = ConfigUtils.getInt((Config)config, (String)OFFSET_COMMIT_NUM_RECORDS_THRESHOLD_KEY, (Integer)100);
        this.offsetsCommitTimeThresholdSecs = ConfigUtils.getInt((Config)config, (String)OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY, (Integer)10);
    }

    protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
        String kafkaConsumerClientClass = config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY);
        try {
            Class<?> clientFactoryClass = Class.forName(kafkaConsumerClientClass);
            GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory factory = (GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory)ConstructorUtils.invokeConstructor(clientFactoryClass, (Object[])new Object[0]);
            return factory.create(config);
        }
        catch (ReflectiveOperationException e) {
            throw new RuntimeException("Failed to instantiate Kafka consumer client " + kafkaConsumerClientClass, e);
        }
    }

    @VisibleForTesting
    protected void buildMetricsContextAndMetrics() {
        this.metricContext = Instrumented.getMetricContext((State)new State(ConfigUtils.configToProperties((Config)this.config)), ((Object)((Object)this)).getClass(), this.getTagsForMetrics());
        this.createMetrics();
    }

    @VisibleForTesting
    protected void shutdownMetrics() throws IOException {
        if (this.metricContext != null) {
            this.metricContext.close();
        }
    }

    protected void createMetrics() {
        this.messagesRead = this.metricContext.counter("gobblin.kafka.highLevelConsumer.messagesRead");
    }

    protected List<Tag<?>> getTagsForMetrics() {
        ArrayList tags = Lists.newArrayList();
        tags.add(new Tag("topic", (Object)this.topic));
        tags.add(new Tag("groupId", (Object)this.config.getString(GROUP_ID_KEY)));
        return tags;
    }

    protected abstract void processMessage(DecodeableKafkaRecord<K, V> var1);

    protected void startUp() {
        this.buildMetricsContextAndMetrics();
        this.processQueues();
        this.consumerExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                HighLevelConsumer.this.consume();
            }
        }, 0L, 50L, TimeUnit.MILLISECONDS);
    }

    private void consume() {
        try {
            Iterator itr = this.gobblinKafkaConsumerClient.consume();
            if (!this.enableAutoCommit) {
                this.commitOffsets();
            }
            while (itr.hasNext()) {
                KafkaConsumerRecord record = (KafkaConsumerRecord)itr.next();
                int idx = record.getPartition() % this.numThreads;
                this.queues[idx].put(record);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processQueues() {
        for (BlockingQueue queue : this.queues) {
            this.queueExecutor.execute(new QueueProcessor(queue));
        }
    }

    private void commitOffsets() {
        if (this.shouldCommitOffsets()) {
            this.copyAndCommit();
        }
    }

    @VisibleForTesting
    protected void commitOffsets(Map<KafkaPartition, Long> partitionOffsets) {
        this.gobblinKafkaConsumerClient.commitOffsetsAsync(partitionOffsets);
    }

    private void copyAndCommit() {
        HashMap<KafkaPartition, Long> copy = new HashMap<KafkaPartition, Long>(this.partitionOffsetsToCommit);
        this.recordsProcessed.set(0);
        this.lastCommitTime = System.currentTimeMillis();
        this.commitOffsets(copy);
    }

    private boolean shouldCommitOffsets() {
        return this.recordsProcessed.intValue() >= this.offsetsCommitNumRecordsThreshold || (System.currentTimeMillis() - this.lastCommitTime) / 1000L >= (long)this.offsetsCommitTimeThresholdSecs;
    }

    public void shutDown() {
        ExecutorsUtils.shutdownExecutorService((ExecutorService)this.consumerExecutor, (Optional)Optional.of((Object)log), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS);
        ExecutorsUtils.shutdownExecutorService((ExecutorService)this.queueExecutor, (Optional)Optional.of((Object)log), (long)5000L, (TimeUnit)TimeUnit.MILLISECONDS);
        try {
            this.gobblinKafkaConsumerClient.close();
            this.shutdownMetrics();
        }
        catch (IOException e) {
            log.warn("Failed to shut down consumer client or metrics ", (Throwable)e);
        }
    }

    public String getTopic() {
        return this.topic;
    }

    public MetricContext getMetricContext() {
        return this.metricContext;
    }

    public GobblinKafkaConsumerClient getGobblinKafkaConsumerClient() {
        return this.gobblinKafkaConsumerClient;
    }

    class QueueProcessor
    implements Runnable {
        private final BlockingQueue<KafkaConsumerRecord> queue;

        public QueueProcessor(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            log.info("Starting queue processing.. " + Thread.currentThread().getName());
            try {
                while (true) {
                    KafkaConsumerRecord record = this.queue.take();
                    HighLevelConsumer.this.messagesRead.inc();
                    HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
                    HighLevelConsumer.this.recordsProcessed.incrementAndGet();
                    if (HighLevelConsumer.this.enableAutoCommit) continue;
                    KafkaPartition partition = new KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build();
                    HighLevelConsumer.this.partitionOffsetsToCommit.put(partition, record.getOffset() + 1L);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }
}

