/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.micrometer.observation;

import io.micrometer.common.util.StringUtils;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.netty.buffer.ByteBuf;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import io.rsocket.metadata.RoutingMetadata;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.micrometer.observation.CompositeMetadataUtils;
import io.rsocket.micrometer.observation.DefaultRSocketResponderObservationConvention;
import io.rsocket.micrometer.observation.RSocketContext;
import io.rsocket.micrometer.observation.RSocketObservationDocumentation;
import io.rsocket.micrometer.observation.RSocketResponderObservationConvention;
import io.rsocket.util.RSocketProxy;
import java.util.Iterator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

public class ObservationResponderRSocketProxy
extends RSocketProxy {
    private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
    private final ObservationRegistry observationRegistry;
    @Nullable
    private final RSocketResponderObservationConvention observationConvention;

    public ObservationResponderRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
        this(source, observationRegistry, null);
    }

    public ObservationResponderRSocketProxy(RSocket source, ObservationRegistry observationRegistry, RSocketResponderObservationConvention observationConvention) {
        super(source);
        this.observationRegistry = observationRegistry;
        this.observationConvention = observationConvention;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        ByteBuf sliceMetadata = payload.sliceMetadata();
        String route = this.route(payload, sliceMetadata);
        RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), FrameType.REQUEST_FNF, route, RSocketContext.Side.RESPONDER);
        Observation newObservation = this.startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_FNF, rSocketContext);
        return super.fireAndForget(rSocketContext.modifiedPayload).doOnError(arg_0 -> ((Observation)newObservation).error(arg_0)).doFinally(signalType -> newObservation.stop()).contextWrite(context -> context.put((Object)MICROMETER_OBSERVATION_KEY, (Object)newObservation));
    }

    private Observation startObservation(RSocketObservationDocumentation observation, RSocketContext rSocketContext) {
        return observation.start(this.observationConvention, new DefaultRSocketResponderObservationConvention(rSocketContext), () -> rSocketContext, this.observationRegistry);
    }

    public Mono<Payload> requestResponse(Payload payload) {
        ByteBuf sliceMetadata = payload.sliceMetadata();
        String route = this.route(payload, sliceMetadata);
        RSocketContext rSocketContext = new RSocketContext(payload, payload.sliceMetadata(), FrameType.REQUEST_RESPONSE, route, RSocketContext.Side.RESPONDER);
        Observation newObservation = this.startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_RESPONSE, rSocketContext);
        return super.requestResponse(rSocketContext.modifiedPayload).doOnError(arg_0 -> ((Observation)newObservation).error(arg_0)).doFinally(signalType -> newObservation.stop()).contextWrite(context -> context.put((Object)MICROMETER_OBSERVATION_KEY, (Object)newObservation));
    }

    public Flux<Payload> requestStream(Payload payload) {
        ByteBuf sliceMetadata = payload.sliceMetadata();
        String route = this.route(payload, sliceMetadata);
        RSocketContext rSocketContext = new RSocketContext(payload, sliceMetadata, FrameType.REQUEST_STREAM, route, RSocketContext.Side.RESPONDER);
        Observation newObservation = this.startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_STREAM, rSocketContext);
        return super.requestStream(rSocketContext.modifiedPayload).doOnError(arg_0 -> ((Observation)newObservation).error(arg_0)).doFinally(signalType -> newObservation.stop()).contextWrite(context -> context.put((Object)MICROMETER_OBSERVATION_KEY, (Object)newObservation));
    }

    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return Flux.from(payloads).switchOnFirst((firstSignal, flux) -> {
            Payload firstPayload = (Payload)firstSignal.get();
            if (firstPayload != null) {
                ByteBuf sliceMetadata = firstPayload.sliceMetadata();
                String route = this.route(firstPayload, sliceMetadata);
                RSocketContext rSocketContext = new RSocketContext(firstPayload, firstPayload.sliceMetadata(), FrameType.REQUEST_CHANNEL, route, RSocketContext.Side.RESPONDER);
                Observation newObservation = this.startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_CHANNEL, rSocketContext);
                if (StringUtils.isNotBlank((String)route)) {
                    newObservation.contextualName(rSocketContext.frameType.name() + " " + route);
                }
                return super.requestChannel((Publisher)flux.skip(1L).startWith((Object[])new Payload[]{rSocketContext.modifiedPayload})).doOnError(arg_0 -> ((Observation)newObservation).error(arg_0)).doFinally(signalType -> newObservation.stop()).contextWrite(context -> context.put((Object)MICROMETER_OBSERVATION_KEY, (Object)newObservation));
            }
            return flux;
        });
    }

    private String route(Payload payload, ByteBuf headers) {
        if (payload.hasMetadata()) {
            try {
                ByteBuf extract = CompositeMetadataUtils.extract(headers, WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString());
                if (extract != null) {
                    RoutingMetadata routingMetadata = new RoutingMetadata(extract);
                    Iterator iterator = routingMetadata.iterator();
                    return (String)iterator.next();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        return null;
    }
}

