/*
 * Decompiled with CFR 0.152.
 */
package co.elastic.apm.agent.kafka;

import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.kafka.BaseKafkaHeadersInstrumentation;
import co.elastic.apm.agent.kafka.KafkaProducerInstrumentation;
import co.elastic.apm.agent.kafka.helper.KafkaInstrumentationHeadersHelper;
import co.elastic.apm.agent.kafka.helper.KafkaInstrumentationHelper;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.matcher.ElementMatcher;
import net.bytebuddy.matcher.ElementMatchers;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerHeadersInstrumentation
extends BaseKafkaHeadersInstrumentation {
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerInstrumentation.class);

    @Override
    public ElementMatcher<? super TypeDescription> getTypeMatcher() {
        return ElementMatchers.named("org.apache.kafka.clients.producer.KafkaProducer");
    }

    @Override
    public ElementMatcher<? super MethodDescription> getMethodMatcher() {
        return ElementMatchers.named("doSend").and(ElementMatchers.takesArgument(0, ElementMatchers.named("org.apache.kafka.clients.producer.ProducerRecord")));
    }

    @Override
    public String getAdviceClassName() {
        return this.getClass().getName() + "$KafkaProducerHeadersAdvice";
    }

    public static class KafkaProducerHeadersAdvice {
        private static final KafkaInstrumentationHelper helper = KafkaInstrumentationHelper.get();
        private static final KafkaInstrumentationHeadersHelper headersHelper = KafkaInstrumentationHeadersHelper.get();
        private static boolean headersSupported = true;

        @Nullable
        @Advice.AssignReturned.ToArguments(value={@Advice.AssignReturned.ToArguments.ToArgument(value=1, index=1, typing=Assigner.Typing.DYNAMIC)})
        @Advice.OnMethodEnter(suppress=Throwable.class, inline=false)
        public static Object[] beforeSend(@Advice.FieldValue(value="apiVersions") ApiVersions apiVersions, @Advice.Argument(value=0) ProducerRecord<?, ?> record, @Nullable @Advice.Argument(value=1) Callback callback) {
            Span span = helper.onSendStart(record);
            if (span == null) {
                return null;
            }
            if (apiVersions.maxUsableProduceMagic() >= 2 && headersSupported) {
                try {
                    headersHelper.setOutgoingTraceContextHeaders(span, record);
                }
                catch (IllegalStateException e) {
                    logger.debug("Failed to add header to Kafka record {}, probably to headers' read-only state.", (Object)record);
                }
            }
            return new Object[]{span, helper.wrapCallback(callback, span)};
        }

        @Nullable
        @Advice.AssignReturned.ToThrown(index=0, typing=Assigner.Typing.DYNAMIC)
        @Advice.OnMethodExit(onThrowable=Throwable.class, suppress=Throwable.class, inline=false)
        public static Object[] afterSend(@Advice.Enter @Nullable Object[] enter, @Advice.Argument(value=0) ProducerRecord<?, ?> record, @Advice.Argument(value=1) Callback callback, @Advice.This KafkaProducer<?, ?> thiz, @Advice.Thrown @Nullable Throwable throwable) {
            Span span;
            Span span2 = span = enter != null ? (Span)enter[0] : null;
            if (span == null) {
                return null;
            }
            Object[] overrideThrowable = null;
            if (throwable != null && throwable.getMessage().contains("Magic v1 does not support record headers")) {
                logger.info("Adding header to Kafka record is not allowed with the used broker, attempting to resend record");
                ProducerRecord copy = new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), record.value(), (Iterable)record.headers());
                headersHelper.removeTraceContextHeader(copy);
                headersSupported = false;
                thiz.send(copy, callback);
                overrideThrowable = new Object[]{null};
            }
            helper.onSendEnd(span, record, thiz, throwable);
            return overrideThrowable;
        }
    }
}

