/*
 * Decompiled with CFR 0.152.
 */
package io.github.changebooks.kafka;

import io.github.changebooks.kafka.KafkaBatchConsumer;
import io.github.changebooks.kafka.KafkaBatchContext;
import io.github.changebooks.kafka.PageUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

public class KafkaBatchListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBatchListener.class);
    private final Executor executor;
    private final KafkaBatchConsumer consumer;
    private int threadNum = 1;

    public KafkaBatchListener(Executor executor, KafkaBatchConsumer consumer) {
        Assert.notNull((Object)executor, (String)"executor can't be null");
        Assert.notNull((Object)consumer, (String)"consumer can't be null");
        this.executor = executor;
        this.consumer = consumer;
    }

    public boolean listen(List<ConsumerRecord<String, String>> records, @Nullable KafkaBatchContext context) {
        if (records == null) {
            LOGGER.warn("listen warning, records can't be null");
            return this.consume(null, context);
        }
        if (records.isEmpty()) {
            LOGGER.warn("listen warning, records can't be empty");
            return this.consume(records, context);
        }
        if (this.threadNum <= 1) {
            return this.consume(records, context);
        }
        List<List<ConsumerRecord<String, String>>> pageList = PageUtils.compute(records, this.threadNum);
        return this.asyncConsume(pageList, context);
    }

    public boolean asyncConsume(@NonNull List<List<ConsumerRecord<String, String>>> pageList, @Nullable KafkaBatchContext context) {
        int size = pageList.size();
        if (size <= 0) {
            return this.consume(null, context);
        }
        if (size == 1) {
            return this.consume(pageList.get(0), context);
        }
        AtomicBoolean result = new AtomicBoolean(true);
        CountDownLatch lock = new CountDownLatch(size);
        Map logContext = MDC.getCopyOfContextMap();
        for (List<ConsumerRecord<String, String>> records : pageList) {
            this.executor.execute(() -> {
                try {
                    if (logContext != null) {
                        MDC.setContextMap((Map)logContext);
                    }
                    if (!this.consume(records, context)) {
                        result.set(false);
                    }
                }
                finally {
                    lock.countDown();
                    if (logContext != null) {
                        MDC.clear();
                    }
                }
            });
        }
        try {
            lock.await();
        }
        catch (InterruptedException tr) {
            LOGGER.error("asyncConsume failed, throwable: ", (Throwable)tr);
        }
        return result.get();
    }

    public boolean consume(List<ConsumerRecord<String, String>> records, @Nullable KafkaBatchContext context) {
        try {
            return this.consumer.consume(records, context);
        }
        catch (Throwable tr) {
            LOGGER.error("consume failed, throwable: ", tr);
            return false;
        }
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public KafkaBatchConsumer getConsumer() {
        return this.consumer;
    }

    public int getThreadNum() {
        return this.threadNum;
    }

    public KafkaBatchListener setThreadNum(int threadNum) {
        Assert.isTrue((threadNum > 0 ? 1 : 0) != 0, (String)"threadNum must be greater than 0");
        this.threadNum = threadNum;
        return this;
    }
}

