/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.loadbalance;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import io.rsocket.loadbalance.FluxDeferredResolution;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.loadbalance.MonoDeferredResolution;
import io.rsocket.loadbalance.RSocketPool;
import io.rsocket.loadbalance.ResolvingOperator;
import io.rsocket.loadbalance.Stats;
import io.rsocket.loadbalance.WeightedRSocket;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

final class PooledWeightedRSocket
extends ResolvingOperator<RSocket>
implements CoreSubscriber<RSocket>,
WeightedRSocket {
    final RSocketPool parent;
    final Mono<RSocket> rSocketSource;
    final LoadbalanceTarget loadbalanceTarget;
    final Stats stats;
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<PooledWeightedRSocket, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(PooledWeightedRSocket.class, Subscription.class, "s");

    PooledWeightedRSocket(RSocketPool parent, Mono<RSocket> rSocketSource, LoadbalanceTarget loadbalanceTarget, Stats stats) {
        this.parent = parent;
        this.rSocketSource = rSocketSource;
        this.loadbalanceTarget = loadbalanceTarget;
        this.stats = stats;
    }

    public void onSubscribe(Subscription s) {
        if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    public void onComplete() {
        Subscription s = this.s;
        RSocket value = (RSocket)this.value;
        if (s == Operators.cancelledSubscription() || !S.compareAndSet(this, s, null)) {
            this.doFinally();
            return;
        }
        if (value == null) {
            this.terminate(new IllegalStateException("Source completed empty"));
        } else {
            this.complete(value);
        }
    }

    public void onError(Throwable t) {
        Subscription s = this.s;
        if (s == Operators.cancelledSubscription() || S.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) {
            this.doFinally();
            Operators.onErrorDropped((Throwable)t, (Context)Context.empty());
            return;
        }
        this.doFinally();
        this.terminate(t);
    }

    public void onNext(RSocket value) {
        if (this.s == Operators.cancelledSubscription()) {
            this.doOnValueExpired(value);
            return;
        }
        this.value = value;
        this.doFinally();
    }

    @Override
    protected void doSubscribe() {
        this.rSocketSource.subscribe((CoreSubscriber)this);
    }

    @Override
    protected void doOnValueResolved(RSocket value) {
        this.stats.setAvailability(1.0);
        value.onClose().subscribe(null, t -> this.invalidate(), this::invalidate);
    }

    @Override
    protected void doOnValueExpired(RSocket value) {
        this.stats.setAvailability(0.0);
        value.dispose();
        this.dispose();
    }

    @Override
    public void dispose() {
        super.dispose();
    }

    @Override
    protected void doOnDispose() {
        PooledWeightedRSocket[] newSockets;
        PooledWeightedRSocket[] sockets;
        RSocketPool parent = this.parent;
        do {
            sockets = parent.activeSockets;
            int activeSocketsCount = sockets.length;
            int index = -1;
            for (int i = 0; i < activeSocketsCount; ++i) {
                if (sockets[i] != this) continue;
                index = i;
                break;
            }
            if (index == -1) break;
            int lastIndex = activeSocketsCount - 1;
            newSockets = new PooledWeightedRSocket[lastIndex];
            if (index != 0) {
                System.arraycopy(sockets, 0, newSockets, 0, index);
            }
            if (index == lastIndex) continue;
            System.arraycopy(sockets, index + 1, newSockets, index, lastIndex - index);
        } while (!RSocketPool.ACTIVE_SOCKETS.compareAndSet(parent, sockets, newSockets));
        this.stats.setAvailability(0.0);
        Operators.terminate(S, (Object)this);
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        return new RequestTrackingMonoInner<Void>(this, payload, FrameType.REQUEST_FNF);
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        return new RequestTrackingMonoInner<Payload>(this, payload, FrameType.REQUEST_RESPONSE);
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return new RequestTrackingFluxInner<Payload>(this, payload, FrameType.REQUEST_STREAM);
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return new RequestTrackingFluxInner<Publisher<Payload>>(this, payloads, FrameType.REQUEST_CHANNEL);
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
        return new RequestTrackingMonoInner<Void>(this, payload, FrameType.METADATA_PUSH);
    }

    @Override
    public Stats stats() {
        return this.stats;
    }

    LoadbalanceTarget target() {
        return this.loadbalanceTarget;
    }

    @Override
    public double availability() {
        return this.stats.availability();
    }

    static final class RequestTrackingFluxInner<INPUT>
    extends FluxDeferredResolution<INPUT, RSocket> {
        RequestTrackingFluxInner(PooledWeightedRSocket parent, INPUT fluxOrPayload, FrameType requestType) {
            super(parent, fluxOrPayload, requestType);
        }

        @Override
        public void accept(RSocket rSocket, Throwable t) {
            if (this.isTerminated()) {
                return;
            }
            if (t != null) {
                if (this.requestType == FrameType.REQUEST_STREAM) {
                    ReferenceCountUtil.safeRelease((Object)this.fluxOrPayload);
                }
                this.onError(t);
                return;
            }
            if (rSocket != null) {
                Flux<Payload> source;
                switch (this.requestType) {
                    case REQUEST_STREAM: {
                        source = rSocket.requestStream((Payload)this.fluxOrPayload);
                        break;
                    }
                    case REQUEST_CHANNEL: {
                        source = rSocket.requestChannel((Publisher<Payload>)((Flux)this.fluxOrPayload));
                        break;
                    }
                    default: {
                        Operators.error((Subscriber)this.actual, (Throwable)new IllegalStateException("Should never happen"));
                        return;
                    }
                }
                ((PooledWeightedRSocket)this.parent).stats.startStream();
                source.subscribe((CoreSubscriber)this);
            } else {
                this.parent.add(this);
            }
        }

        @Override
        public void onComplete() {
            long state = this.requested;
            if (state != 2L && REQUESTED.compareAndSet(this, state, 2L)) {
                ((PooledWeightedRSocket)this.parent).stats.stopStream();
                super.onComplete();
            }
        }

        @Override
        public void onError(Throwable t) {
            long state = this.requested;
            if (state != 2L && REQUESTED.compareAndSet(this, state, 2L)) {
                ((PooledWeightedRSocket)this.parent).stats.stopStream();
                super.onError(t);
            }
        }

        @Override
        public void cancel() {
            long state = REQUESTED.getAndSet(this, Long.MIN_VALUE);
            if (state == Long.MIN_VALUE) {
                return;
            }
            if (state == -2L) {
                this.s.cancel();
                ((PooledWeightedRSocket)this.parent).stats.stopStream();
            } else {
                this.parent.remove(this);
                if (this.requestType == FrameType.REQUEST_STREAM) {
                    ReferenceCountUtil.safeRelease((Object)this.fluxOrPayload);
                }
            }
        }
    }

    static final class RequestTrackingMonoInner<RESULT>
    extends MonoDeferredResolution<RESULT, RSocket> {
        long startTime;

        RequestTrackingMonoInner(PooledWeightedRSocket parent, Payload payload, FrameType requestType) {
            super(parent, payload, requestType);
        }

        @Override
        public void accept(RSocket rSocket, Throwable t) {
            if (this.isTerminated()) {
                return;
            }
            if (t != null) {
                ReferenceCountUtil.safeRelease((Object)this.payload);
                this.onError(t);
                return;
            }
            if (rSocket != null) {
                Object source;
                switch (this.requestType) {
                    case REQUEST_FNF: {
                        source = rSocket.fireAndForget(this.payload);
                        break;
                    }
                    case REQUEST_RESPONSE: {
                        source = rSocket.requestResponse(this.payload);
                        break;
                    }
                    case METADATA_PUSH: {
                        source = rSocket.metadataPush(this.payload);
                        break;
                    }
                    default: {
                        Operators.error((Subscriber)this.actual, (Throwable)new IllegalStateException("Should never happen"));
                        return;
                    }
                }
                this.startTime = ((PooledWeightedRSocket)this.parent).stats.startRequest();
                source.subscribe((CoreSubscriber)this);
            } else {
                this.parent.add(this);
            }
        }

        @Override
        public void onComplete() {
            long state = this.requested;
            if (state != 2L && REQUESTED.compareAndSet(this, state, 2L)) {
                Stats stats = ((PooledWeightedRSocket)this.parent).stats;
                long now = stats.stopRequest(this.startTime);
                stats.record(now - this.startTime);
                super.onComplete();
            }
        }

        @Override
        public void onError(Throwable t) {
            long state = this.requested;
            if (state != 2L && REQUESTED.compareAndSet(this, state, 2L)) {
                Stats stats = ((PooledWeightedRSocket)this.parent).stats;
                stats.stopRequest(this.startTime);
                stats.recordError(0.0);
                super.onError(t);
            }
        }

        @Override
        public void cancel() {
            long state = REQUESTED.getAndSet(this, Long.MIN_VALUE);
            if (state == Long.MIN_VALUE) {
                return;
            }
            if (state == -2L) {
                this.s.cancel();
                ((PooledWeightedRSocket)this.parent).stats.stopRequest(this.startTime);
            } else {
                this.parent.remove(this);
                ReferenceCountUtil.safeRelease((Object)this.payload);
            }
        }
    }
}

