/*
 * Decompiled with CFR 0.152.
 */
package co.elastic.apm.agent.reactor;

import co.elastic.apm.agent.collections.WeakConcurrentProviderImpl;
import co.elastic.apm.agent.impl.Tracer;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.reactor.ReactorInstrumentation;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.sdk.state.GlobalVariables;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

public class TracedSubscriber<T>
implements CoreSubscriber<T> {
    private static final Logger log = LoggerFactory.getLogger(TracedSubscriber.class);
    private static final AtomicBoolean isRegistered = GlobalVariables.get(ReactorInstrumentation.class, "reactor-hook-enabled", new AtomicBoolean(false));
    private static final WeakMap<TracedSubscriber<?>, AbstractSpan<?>> contextMap = WeakConcurrentProviderImpl.createWeakSpanMap();
    private static final String HOOK_KEY = "elastic-apm";
    private final CoreSubscriber<? super T> subscriber;
    private final Tracer tracer;
    private final Context context;
    private static final WeakMap<Class<?>, Boolean> toStringSupported = WeakConcurrent.buildMap();

    TracedSubscriber(CoreSubscriber<? super T> subscriber, Tracer tracer, AbstractSpan<?> context) {
        this.subscriber = subscriber;
        this.tracer = tracer;
        contextMap.put(this, context);
        this.context = subscriber.currentContext().put(AbstractSpan.class, context);
    }

    public Context currentContext() {
        return this.context;
    }

    public void onSubscribe(Subscription s) {
        AbstractSpan<?> context = this.getContext();
        boolean hasActivated = this.doEnter("onSubscribe", context);
        Throwable thrown = null;
        try {
            this.subscriber.onSubscribe(s);
            this.doExit(hasActivated, "onSubscribe", context);
            this.discardIf(thrown != null);
        }
        catch (Throwable e) {
            try {
                thrown = e;
                throw e;
            }
            catch (Throwable throwable) {
                this.doExit(hasActivated, "onSubscribe", context);
                this.discardIf(thrown != null);
                throw throwable;
            }
        }
    }

    public void onNext(T next) {
        AbstractSpan<?> context = this.getContext();
        boolean hasActivated = this.doEnter("onNext", context);
        Throwable thrown = null;
        try {
            this.subscriber.onNext(next);
            this.doExit(hasActivated, "onNext", context);
            this.discardIf(thrown != null);
        }
        catch (Throwable e) {
            try {
                thrown = e;
                throw e;
            }
            catch (Throwable throwable) {
                this.doExit(hasActivated, "onNext", context);
                this.discardIf(thrown != null);
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(Throwable t) {
        AbstractSpan<?> context = this.getContext();
        boolean hasActivated = this.doEnter("onError", context);
        try {
            this.subscriber.onError(t);
        }
        finally {
            this.doExit(hasActivated, "onError", context);
            this.discardIf(true);
        }
    }

    public void onComplete() {
        AbstractSpan<?> context = this.getContext();
        boolean hasActivated = this.doEnter("onComplete", context);
        try {
            this.subscriber.onComplete();
        }
        finally {
            this.doExit(hasActivated, "onComplete", context);
            this.discardIf(true);
        }
    }

    private boolean doEnter(String method, @Nullable AbstractSpan<?> context) {
        this.debugTrace(true, method, context);
        if (context == null || this.tracer.getActive() == context) {
            return false;
        }
        context.activate();
        return true;
    }

    private void doExit(boolean deactivate, String method, @Nullable AbstractSpan<?> context) {
        this.debugTrace(false, method, context);
        if (context == null || !deactivate) {
            return;
        }
        if (context != this.tracer.getActive()) {
            return;
        }
        context.deactivate();
    }

    private void discardIf(boolean condition) {
        if (!condition) {
            return;
        }
        contextMap.remove(this);
    }

    private void debugTrace(boolean isEnter, String method, @Nullable AbstractSpan<?> context) {
        if (!log.isTraceEnabled()) {
            return;
        }
        log.trace("{} reactor {} {}", isEnter ? ">>" : "<<", method, context);
    }

    @Nullable
    private AbstractSpan<?> getContext() {
        return contextMap.get(this);
    }

    static void registerHooks(Tracer tracer) {
        if (isRegistered.getAndSet(true)) {
            return;
        }
        Hooks.onEachOperator((String)HOOK_KEY, TracedSubscriber.wrapOperators(tracer));
    }

    static void unregisterHooks() {
        if (!isRegistered.getAndSet(false)) {
            return;
        }
        Hooks.resetOnEachOperator((String)HOOK_KEY);
    }

    static boolean isHookRegistered() {
        return isRegistered.get();
    }

    private static <X> Function<? super Publisher<X>, ? extends Publisher<X>> wrapOperators(final Tracer tracer) {
        return Operators.liftPublisher((BiFunction)new BiFunction<Publisher, CoreSubscriber<? super X>, CoreSubscriber<? super X>>(){

            @Override
            public CoreSubscriber<? super X> apply(Publisher publisher, CoreSubscriber<? super X> subscriber) {
                if (publisher instanceof Fuseable.ScalarCallable) {
                    if (log.isTraceEnabled()) {
                        log.trace("skip wrapping {}", (Object)TracedSubscriber.safeToString(subscriber));
                    }
                    return subscriber;
                }
                AbstractSpan active = tracer.getActive();
                if (active == null) {
                    active = (AbstractSpan)subscriber.currentContext().getOrDefault(AbstractSpan.class, null);
                }
                if (active == null) {
                    return subscriber;
                }
                if (log.isTraceEnabled()) {
                    log.trace("wrapping subscriber {} publisher {} with active span/transaction {}", TracedSubscriber.safeToString(subscriber), publisher, active);
                }
                return new TracedSubscriber(subscriber, tracer, active);
            }
        });
    }

    private static String safeToString(CoreSubscriber<?> subscriber) {
        Class<?> type = subscriber.getClass();
        Boolean supported = toStringSupported.get(type);
        String value = "???";
        if (supported == Boolean.FALSE) {
            return value;
        }
        try {
            value = subscriber.toString();
            if (supported == null) {
                toStringSupported.put(type, Boolean.TRUE);
            }
        }
        catch (Exception e) {
            toStringSupported.put(type, Boolean.FALSE);
        }
        return value;
    }
}

