/*
 * Decompiled with CFR 0.152.
 */
package co.elastic.apm.agent.kafka;

import co.elastic.apm.agent.bci.TracerAwareInstrumentation;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.kafka.BaseKafkaInstrumentation;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.util.LoggerUtils;
import co.elastic.apm.agent.util.PrivilegedActionUtils;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import net.bytebuddy.matcher.ElementMatchers;

public class SpringKafkaBatchListenerInstrumentation
extends BaseKafkaInstrumentation {
    @Override
    public ElementMatcher<? super TypeDescription> getTypeMatcher() {
        return ElementMatchers.named("org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer");
    }

    @Override
    public ElementMatcher<? super MethodDescription> getMethodMatcher() {
        return ElementMatchers.named("invokeBatchListener").and(ElementMatchers.takesArguments(1)).and(ElementMatchers.takesArgument(0, ElementMatchers.named("org.apache.kafka.clients.consumer.ConsumerRecords")));
    }

    @Override
    public String getAdviceClassName() {
        return this.getClass().getName() + "$SpringKafkaBatchListenerAdvice";
    }

    public static class SpringKafkaBatchListenerAdvice {
        private static final Logger oneTimeTransactionCreationWarningLogger = LoggerUtils.logOnce(LoggerFactory.getLogger("Spring-Kafka-Batch-Logger"));

        @Nullable
        @Advice.OnMethodEnter(suppress=Throwable.class, inline=false)
        public static Object onEnter(@Advice.This Object thiz) {
            Transaction transaction = TracerAwareInstrumentation.tracer.startRootTransaction(PrivilegedActionUtils.getClassLoader(thiz.getClass()));
            if (transaction != null) {
                ((Transaction)((Transaction)transaction.withType("messaging")).withName("Spring Kafka Message Batch Processing")).activate();
                transaction.setFrameworkName("Kafka");
            } else {
                oneTimeTransactionCreationWarningLogger.warn("Failed to start Spring Kafka transaction for batch processing");
            }
            return transaction;
        }

        @Advice.OnMethodExit(suppress=Throwable.class, onThrowable=Throwable.class, inline=false)
        public static void onExit(@Nullable @Advice.Enter Object transactionObj) {
            Transaction transaction = (Transaction)transactionObj;
            if (transaction != null) {
                ((Transaction)transaction.deactivate()).end();
            }
        }
    }
}

