package io.rsocket.micrometer.observation;

import io.micrometer.common.util.StringUtils;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.docs.ObservationDocumentation;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import io.rsocket.metadata.RoutingMetadata;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.micrometer.observation.RSocketContext;
import io.rsocket.util.RSocketProxy;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.context.ContextView;

/* loaded from: input_file:BOOT-INF/lib/rsocket-micrometer-1.1.4.jar:io/rsocket/micrometer/observation/ObservationRequesterRSocketProxy.class */
public class ObservationRequesterRSocketProxy extends RSocketProxy {
    private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
    private final ObservationRegistry observationRegistry;

    @Nullable
    private final RSocketRequesterObservationConvention observationConvention;

    public ObservationRequesterRSocketProxy(RSocket rSocket, ObservationRegistry observationRegistry) {
        this(rSocket, observationRegistry, null);
    }

    public ObservationRequesterRSocketProxy(RSocket rSocket, ObservationRegistry observationRegistry, RSocketRequesterObservationConvention rSocketRequesterObservationConvention) {
        super(rSocket);
        this.observationRegistry = observationRegistry;
        this.observationConvention = rSocketRequesterObservationConvention;
    }

    @Override // io.rsocket.util.RSocketProxy, io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        return setObservation(payload2 -> {
            return super.fireAndForget(payload2);
        }, payload, FrameType.REQUEST_FNF, RSocketObservationDocumentation.RSOCKET_REQUESTER_FNF);
    }

    @Override // io.rsocket.util.RSocketProxy, io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        return setObservation(payload2 -> {
            return super.requestResponse(payload2);
        }, payload, FrameType.REQUEST_RESPONSE, RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_RESPONSE);
    }

    <T> Mono<T> setObservation(Function<Payload, Mono<T>> function, Payload payload, FrameType frameType, ObservationDocumentation observationDocumentation) {
        return Mono.deferContextual(contextView -> {
            return observe(function, payload, frameType, observationDocumentation, contextView);
        });
    }

    private String route(Payload payload) {
        if (!payload.hasMetadata()) {
            return null;
        }
        try {
            return new RoutingMetadata(CompositeMetadataUtils.extract(payload.sliceMetadata(), WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())).iterator().next();
        } catch (Exception e) {
            return null;
        }
    }

    private <T> Mono<T> observe(Function<Payload, Mono<T>> function, Payload payload, FrameType frameType, ObservationDocumentation observationDocumentation, ContextView contextView) {
        String route = route(payload);
        RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER);
        Observation parentObservation = observationDocumentation.observation(this.observationConvention, new DefaultRSocketRequesterObservationConvention(rSocketContext), () -> {
            return rSocketContext;
        }, this.observationRegistry).parentObservation((Observation) contextView.getOrDefault("micrometer.observation", null));
        setContextualName(frameType, route, parentObservation);
        parentObservation.start();
        Payload payload2 = payload;
        if (rSocketContext.modifiedPayload != null) {
            payload2 = rSocketContext.modifiedPayload;
        }
        Mono<T> apply = function.apply(payload2);
        parentObservation.getClass();
        return apply.doOnError(parentObservation::error).doFinally(signalType -> {
            parentObservation.stop();
        }).contextWrite(context -> {
            return context.put("micrometer.observation", parentObservation);
        });
    }

    @Override // io.rsocket.util.RSocketProxy, io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        return observationFlux(payload2 -> {
            return super.requestStream(payload2);
        }, payload, FrameType.REQUEST_STREAM, RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM);
    }

    @Override // io.rsocket.util.RSocketProxy, io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            Payload payload = (Payload) signal.get();
            return payload != null ? observationFlux(payload2 -> {
                return super.requestChannel(flux.skip(1L).startWith(payload2));
            }, payload, FrameType.REQUEST_CHANNEL, RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_CHANNEL) : flux;
        });
    }

    private Flux<Payload> observationFlux(Function<Payload, Flux<Payload>> function, Payload payload, FrameType frameType, ObservationDocumentation observationDocumentation) {
        return Flux.deferContextual(contextView -> {
            String route = route(payload);
            RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER);
            Observation parentObservation = observationDocumentation.observation(this.observationConvention, new DefaultRSocketRequesterObservationConvention(rSocketContext), () -> {
                return rSocketContext;
            }, this.observationRegistry).parentObservation((Observation) contextView.getOrDefault("micrometer.observation", null));
            setContextualName(frameType, route, parentObservation);
            parentObservation.start();
            Flux flux = (Flux) function.apply(rSocketContext.modifiedPayload);
            parentObservation.getClass();
            return flux.doOnError(parentObservation::error).doFinally(signalType -> {
                parentObservation.stop();
            }).contextWrite(context -> {
                return context.put("micrometer.observation", parentObservation);
            });
        });
    }

    private void setContextualName(FrameType frameType, String str, Observation observation) {
        if (StringUtils.isNotBlank(str)) {
            observation.contextualName(frameType.name() + " " + str);
        } else {
            observation.contextualName(frameType.name());
        }
    }
}
