package io.github.changebooks.kafka;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/* loaded from: input_file:io/github/changebooks/kafka/KafkaBatchListener.class */
public class KafkaBatchListener {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaBatchListener.class);
    private final Executor executor;
    private final KafkaBatchConsumer consumer;
    private int threadNum = 1;

    public KafkaBatchListener(Executor executor, KafkaBatchConsumer kafkaBatchConsumer) {
        Assert.notNull(executor, "executor can't be null");
        Assert.notNull(kafkaBatchConsumer, "consumer can't be null");
        this.executor = executor;
        this.consumer = kafkaBatchConsumer;
    }

    public boolean listen(List<ConsumerRecord<String, String>> list, @Nullable KafkaBatchContext kafkaBatchContext) {
        if (list == null) {
            LOGGER.warn("listen warning, records can't be null");
            return consume(null, kafkaBatchContext);
        }
        if (!list.isEmpty()) {
            return this.threadNum <= 1 ? consume(list, kafkaBatchContext) : asyncConsume(PageUtils.compute(list, this.threadNum), kafkaBatchContext);
        }
        LOGGER.warn("listen warning, records can't be empty");
        return consume(list, kafkaBatchContext);
    }

    public boolean asyncConsume(@NonNull List<List<ConsumerRecord<String, String>>> list, @Nullable KafkaBatchContext kafkaBatchContext) {
        int size = list.size();
        if (size <= 0) {
            return consume(null, kafkaBatchContext);
        }
        if (size == 1) {
            return consume(list.get(0), kafkaBatchContext);
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        CountDownLatch countDownLatch = new CountDownLatch(size);
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        for (List<ConsumerRecord<String, String>> list2 : list) {
            this.executor.execute(() -> {
                if (copyOfContextMap != null) {
                    try {
                        MDC.setContextMap(copyOfContextMap);
                    } finally {
                        countDownLatch.countDown();
                        if (copyOfContextMap != null) {
                            MDC.clear();
                        }
                    }
                }
                if (!consume(list2, kafkaBatchContext)) {
                    atomicBoolean.set(false);
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            LOGGER.error("asyncConsume failed, throwable: ", (Throwable) e);
        }
        return atomicBoolean.get();
    }

    public boolean consume(List<ConsumerRecord<String, String>> list, @Nullable KafkaBatchContext kafkaBatchContext) {
        try {
            return this.consumer.consume(list, kafkaBatchContext);
        } catch (Throwable th) {
            LOGGER.error("consume failed, throwable: ", th);
            return false;
        }
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public KafkaBatchConsumer getConsumer() {
        return this.consumer;
    }

    public int getThreadNum() {
        return this.threadNum;
    }

    public KafkaBatchListener setThreadNum(int i) {
        Assert.isTrue(i > 0, "threadNum must be greater than 0");
        this.threadNum = i;
        return this;
    }
}
