package io.github.changebooks.kafka;

import io.github.changebooks.log.LogClear;
import io.github.changebooks.log.LogId;
import io.github.changebooks.log.LogTraceId;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/changebooks/kafka/LogConsumerInterceptor.class */
public class LogConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) LogConsumerInterceptor.class);

    @Override // org.apache.kafka.clients.consumer.ConsumerInterceptor
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> consumerRecords) {
        if (consumerRecords == null) {
            return null;
        }
        try {
            processLog(consumerRecords);
        } catch (Throwable th) {
            LOGGER.error("onConsume failed, throwable: ", th);
        }
        return consumerRecords;
    }

    public void processLog(ConsumerRecords<K, V> consumerRecords) {
        Headers consumer = KafkaHeaders.getConsumer(consumerRecords);
        if (consumer == null) {
            LogTraceId.init();
            LogId.init();
        } else {
            KafkaTraceId.onConsume(consumer);
            KafkaLogId.onConsume(consumer);
        }
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerInterceptor
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        LogClear.clear();
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerInterceptor, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
    }
}
