package io.rsocket.micrometer.observation;

import io.micrometer.common.util.StringUtils;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.netty.buffer.ByteBuf;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import io.rsocket.metadata.RoutingMetadata;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.micrometer.observation.RSocketContext;
import io.rsocket.util.RSocketProxy;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:BOOT-INF/lib/rsocket-micrometer-1.1.4.jar:io/rsocket/micrometer/observation/ObservationResponderRSocketProxy.class */
public class ObservationResponderRSocketProxy extends RSocketProxy {
    private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
    private final ObservationRegistry observationRegistry;

    @Nullable
    private final RSocketResponderObservationConvention observationConvention;

    public ObservationResponderRSocketProxy(RSocket rSocket, ObservationRegistry observationRegistry) {
        this(rSocket, observationRegistry, null);
    }

    public ObservationResponderRSocketProxy(RSocket rSocket, ObservationRegistry observationRegistry, RSocketResponderObservationConvention rSocketResponderObservationConvention) {
        super(rSocket);
        this.observationRegistry = observationRegistry;
        this.observationConvention = rSocketResponderObservationConvention;
    }

    @Override // io.rsocket.util.RSocketProxy, io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), FrameType.REQUEST_FNF, route(payload, payload.sliceMetadata()), RSocketContext.Side.RESPONDER);
        Observation startObservation = startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_FNF, rSocketContext);
        Mono<Void> fireAndForget = super.fireAndForget(rSocketContext.modifiedPayload);
        startObservation.getClass();
        return fireAndForget.doOnError(startObservation::error).doFinally(signalType -> {
            startObservation.stop();
        }).contextWrite(context -> {
            return context.put("micrometer.observation", startObservation);
        });
    }

    private Observation startObservation(RSocketObservationDocumentation rSocketObservationDocumentation, RSocketContext rSocketContext) {
        return rSocketObservationDocumentation.start(this.observationConvention, new DefaultRSocketResponderObservationConvention(rSocketContext), () -> {
            return rSocketContext;
        }, this.observationRegistry);
    }

    @Override // io.rsocket.util.RSocketProxy, io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), FrameType.REQUEST_RESPONSE, route(payload, payload.sliceMetadata()), RSocketContext.Side.RESPONDER);
        Observation startObservation = startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_RESPONSE, rSocketContext);
        Mono<Payload> requestResponse = super.requestResponse(rSocketContext.modifiedPayload);
        startObservation.getClass();
        return requestResponse.doOnError(startObservation::error).doFinally(signalType -> {
            startObservation.stop();
        }).contextWrite(context -> {
            return context.put("micrometer.observation", startObservation);
        });
    }

    @Override // io.rsocket.util.RSocketProxy, io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        ByteBuf sliceMetadata = payload.sliceMetadata();
        RSocketContext rSocketContext = new RSocketContext(payload, sliceMetadata, FrameType.REQUEST_STREAM, route(payload, sliceMetadata), RSocketContext.Side.RESPONDER);
        Observation startObservation = startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_STREAM, rSocketContext);
        Flux<Payload> requestStream = super.requestStream(rSocketContext.modifiedPayload);
        startObservation.getClass();
        return requestStream.doOnError(startObservation::error).doFinally(signalType -> {
            startObservation.stop();
        }).contextWrite(context -> {
            return context.put("micrometer.observation", startObservation);
        });
    }

    @Override // io.rsocket.util.RSocketProxy, io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            Payload payload = (Payload) signal.get();
            if (payload == null) {
                return flux;
            }
            String route = route(payload, payload.sliceMetadata());
            RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), FrameType.REQUEST_CHANNEL, route, RSocketContext.Side.RESPONDER);
            Observation startObservation = startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_CHANNEL, rSocketContext);
            if (StringUtils.isNotBlank(route)) {
                startObservation.contextualName(rSocketContext.frameType.name() + " " + route);
            }
            Flux<Payload> requestChannel = super.requestChannel(flux.skip(1L).startWith(rSocketContext.modifiedPayload));
            startObservation.getClass();
            return requestChannel.doOnError(startObservation::error).doFinally(signalType -> {
                startObservation.stop();
            }).contextWrite(context -> {
                return context.put("micrometer.observation", startObservation);
            });
        });
    }

    private String route(Payload payload, ByteBuf byteBuf) {
        if (!payload.hasMetadata()) {
            return null;
        }
        try {
            ByteBuf extract = CompositeMetadataUtils.extract(byteBuf, WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString());
            if (extract != null) {
                return new RoutingMetadata(extract).iterator().next();
            }
            return null;
        } catch (Exception e) {
            return null;
        }
    }
}
