/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.IllegalReferenceCountException;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.core.PayloadValidationUtils;
import io.rsocket.core.ReassemblyUtils;
import io.rsocket.core.RequesterFrameHandler;
import io.rsocket.core.RequesterResponderSupport;
import io.rsocket.core.SendUtils;
import io.rsocket.core.StateUtils;
import io.rsocket.frame.CancelFrameCodec;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RequestInterceptor;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

final class RequestChannelRequesterFlux
extends Flux<Payload>
implements RequesterFrameHandler,
CoreSubscriber<Payload>,
Subscription,
Scannable {
    final ByteBufAllocator allocator;
    final int mtu;
    final int maxFrameLength;
    final int maxInboundPayloadSize;
    final RequesterResponderSupport requesterResponderSupport;
    final DuplexConnection connection;
    final PayloadDecoder payloadDecoder;
    final Publisher<Payload> payloadsPublisher;
    @Nullable
    final RequestInterceptor requestInterceptor;
    volatile long state;
    static final AtomicLongFieldUpdater<RequestChannelRequesterFlux> STATE = AtomicLongFieldUpdater.newUpdater(RequestChannelRequesterFlux.class, "state");
    int streamId;
    Context cachedContext;
    boolean isFirstPayload = true;
    CoreSubscriber<? super Payload> inboundSubscriber;
    Subscription outboundSubscription;
    boolean inboundDone;
    boolean outboundDone;
    CompositeByteBuf frames;

    RequestChannelRequesterFlux(Publisher<Payload> payloadsPublisher, RequesterResponderSupport requesterResponderSupport) {
        this.allocator = requesterResponderSupport.getAllocator();
        this.payloadsPublisher = payloadsPublisher;
        this.mtu = requesterResponderSupport.getMtu();
        this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
        this.maxInboundPayloadSize = requesterResponderSupport.getMaxInboundPayloadSize();
        this.requesterResponderSupport = requesterResponderSupport;
        this.connection = requesterResponderSupport.getDuplexConnection();
        this.payloadDecoder = requesterResponderSupport.getPayloadDecoder();
        this.requestInterceptor = requesterResponderSupport.getRequestInterceptor();
    }

    public void subscribe(CoreSubscriber<? super Payload> actual) {
        Objects.requireNonNull(actual, "subscribe");
        long previousState = StateUtils.markSubscribed(STATE, this);
        if (StateUtils.isSubscribedOrTerminated(previousState)) {
            IllegalStateException e = new IllegalStateException("RequestChannelFlux allows only a single Subscriber");
            RequestInterceptor requestInterceptor = this.requestInterceptor;
            if (requestInterceptor != null) {
                requestInterceptor.onReject(e, FrameType.REQUEST_CHANNEL, null);
            }
            Operators.error(actual, (Throwable)e);
            return;
        }
        this.inboundSubscriber = actual;
        this.payloadsPublisher.subscribe((Subscriber)this);
    }

    public void onSubscribe(Subscription outboundSubscription) {
        if (Operators.validate((Subscription)this.outboundSubscription, (Subscription)outboundSubscription)) {
            this.outboundSubscription = outboundSubscription;
            this.inboundSubscriber.onSubscribe((Subscription)this);
        }
    }

    public final void request(long n) {
        if (!Operators.validate((long)n)) {
            return;
        }
        long previousState = StateUtils.addRequestN(STATE, this, n);
        if (StateUtils.isTerminated(previousState)) {
            return;
        }
        if (StateUtils.hasRequested(previousState)) {
            if (StateUtils.isFirstFrameSent(previousState) && !StateUtils.isMaxAllowedRequestN(StateUtils.extractRequestN(previousState))) {
                int streamId = this.streamId;
                ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n);
                this.connection.sendFrame(streamId, requestNFrame);
            }
            return;
        }
        this.outboundSubscription.request(1L);
    }

    public void onNext(Payload p) {
        if (this.outboundDone) {
            p.release();
            return;
        }
        if (this.isFirstPayload) {
            this.isFirstPayload = false;
            long state = this.state;
            if (StateUtils.isTerminated(state)) {
                p.release();
                return;
            }
            this.sendFirstPayload(p, StateUtils.extractRequestN(state));
        } else {
            this.sendFollowingPayload(p);
        }
    }

    void sendFirstPayload(Payload firstPayload, long initialRequestN) {
        int streamId;
        int mtu = this.mtu;
        try {
            if (!PayloadValidationUtils.isValid(mtu, this.maxFrameLength, firstPayload, true)) {
                StateUtils.lazyTerminate(STATE, this);
                this.outboundSubscription.cancel();
                IllegalArgumentException e = new IllegalArgumentException(String.format("The payload is too big to be send as a single frame with a max frame length %s. Consider enabling fragmentation.", this.maxFrameLength));
                RequestInterceptor requestInterceptor = this.requestInterceptor;
                if (requestInterceptor != null) {
                    requestInterceptor.onReject(e, FrameType.REQUEST_CHANNEL, firstPayload.metadata());
                }
                firstPayload.release();
                this.inboundDone = true;
                this.inboundSubscriber.onError((Throwable)e);
                return;
            }
        }
        catch (IllegalReferenceCountException e) {
            StateUtils.lazyTerminate(STATE, this);
            this.outboundSubscription.cancel();
            RequestInterceptor requestInterceptor = this.requestInterceptor;
            if (requestInterceptor != null) {
                requestInterceptor.onReject(e, FrameType.REQUEST_CHANNEL, null);
            }
            this.inboundDone = true;
            this.inboundSubscriber.onError((Throwable)e);
            return;
        }
        RequesterResponderSupport sm = this.requesterResponderSupport;
        DuplexConnection connection = this.connection;
        ByteBufAllocator allocator = this.allocator;
        try {
            this.streamId = streamId = sm.addAndGetNextStreamId(this);
        }
        catch (Throwable t) {
            this.inboundDone = true;
            long previousState = StateUtils.markTerminated(STATE, this);
            this.outboundSubscription.cancel();
            Throwable ut = Exceptions.unwrap((Throwable)t);
            RequestInterceptor requestInterceptor = this.requestInterceptor;
            if (requestInterceptor != null) {
                requestInterceptor.onReject(ut, FrameType.REQUEST_CHANNEL, firstPayload.metadata());
            }
            firstPayload.release();
            if (!StateUtils.isTerminated(previousState)) {
                this.inboundSubscriber.onError(ut);
            }
            return;
        }
        RequestInterceptor requestInterceptor = this.requestInterceptor;
        if (requestInterceptor != null) {
            requestInterceptor.onStart(streamId, FrameType.REQUEST_CHANNEL, firstPayload.metadata());
        }
        try {
            SendUtils.sendReleasingPayload(streamId, FrameType.REQUEST_CHANNEL, initialRequestN, mtu, firstPayload, connection, allocator, false);
        }
        catch (Throwable t) {
            StateUtils.lazyTerminate(STATE, this);
            sm.remove(streamId, this);
            this.outboundSubscription.cancel();
            this.inboundDone = true;
            if (requestInterceptor != null) {
                requestInterceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
            }
            this.inboundSubscriber.onError(t);
            return;
        }
        long previousState = StateUtils.markFirstFrameSent(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            if (this.inboundDone) {
                return;
            }
            sm.remove(streamId, this);
            ReassemblyUtils.synchronizedRelease(this, previousState);
            ByteBuf cancelFrame = CancelFrameCodec.encode(allocator, streamId);
            connection.sendFrame(streamId, cancelFrame);
            if (requestInterceptor != null) {
                requestInterceptor.onCancel(streamId, FrameType.REQUEST_CHANNEL);
            }
            return;
        }
        if (StateUtils.isMaxAllowedRequestN(initialRequestN)) {
            return;
        }
        long requestN = StateUtils.extractRequestN(previousState);
        if (StateUtils.isMaxAllowedRequestN(requestN)) {
            ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN);
            connection.sendFrame(streamId, requestNFrame);
            return;
        }
        if (requestN > initialRequestN) {
            ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN - initialRequestN);
            connection.sendFrame(streamId, requestNFrame);
        }
    }

    final void sendFollowingPayload(Payload followingPayload) {
        int streamId = this.streamId;
        int mtu = this.mtu;
        try {
            if (!PayloadValidationUtils.isValid(mtu, this.maxFrameLength, followingPayload, true)) {
                followingPayload.release();
                IllegalArgumentException e = new IllegalArgumentException(String.format("The payload is too big to be send as a single frame with a max frame length %s. Consider enabling fragmentation.", this.maxFrameLength));
                if (!this.tryCancel()) {
                    Operators.onErrorDropped((Throwable)e, (Context)this.inboundSubscriber.currentContext());
                    return;
                }
                this.propagateErrorSafely(e);
                return;
            }
        }
        catch (IllegalReferenceCountException e) {
            if (!this.tryCancel()) {
                Operators.onErrorDropped((Throwable)e, (Context)this.inboundSubscriber.currentContext());
                return;
            }
            this.propagateErrorSafely(e);
            return;
        }
        try {
            SendUtils.sendReleasingPayload(streamId, FrameType.NEXT, mtu, followingPayload, this.connection, this.allocator, true);
        }
        catch (Throwable e) {
            if (!this.tryCancel()) {
                Operators.onErrorDropped((Throwable)e, (Context)this.inboundSubscriber.currentContext());
                return;
            }
            this.propagateErrorSafely(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void propagateErrorSafely(Throwable t) {
        if (!this.inboundDone) {
            RequestChannelRequesterFlux requestChannelRequesterFlux = this;
            synchronized (requestChannelRequesterFlux) {
                if (!this.inboundDone) {
                    RequestInterceptor interceptor = this.requestInterceptor;
                    if (interceptor != null) {
                        interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, t);
                    }
                    this.inboundDone = true;
                    this.inboundSubscriber.onError(t);
                } else {
                    Operators.onErrorDropped((Throwable)t, (Context)this.inboundSubscriber.currentContext());
                }
            }
        } else {
            Operators.onErrorDropped((Throwable)t, (Context)this.inboundSubscriber.currentContext());
        }
    }

    public final void cancel() {
        if (!this.tryCancel()) {
            return;
        }
        RequestInterceptor requestInterceptor = this.requestInterceptor;
        if (requestInterceptor != null) {
            requestInterceptor.onCancel(this.streamId, FrameType.REQUEST_CHANNEL);
        }
    }

    boolean tryCancel() {
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            return false;
        }
        this.outboundSubscription.cancel();
        if (!StateUtils.isFirstFrameSent(previousState)) {
            return false;
        }
        int streamId = this.streamId;
        this.requesterResponderSupport.remove(streamId, this);
        ReassemblyUtils.synchronizedRelease(this, previousState);
        ByteBuf cancelFrame = CancelFrameCodec.encode(this.allocator, streamId);
        this.connection.sendFrame(streamId, cancelFrame);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(Throwable t) {
        if (this.outboundDone) {
            Operators.onErrorDropped((Throwable)t, (Context)this.inboundSubscriber.currentContext());
            return;
        }
        this.outboundDone = true;
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState) || StateUtils.isOutboundTerminated(previousState)) {
            Operators.onErrorDropped((Throwable)t, (Context)this.inboundSubscriber.currentContext());
            return;
        }
        if (!StateUtils.isFirstFrameSent(previousState)) {
            this.inboundSubscriber.onError(t);
            return;
        }
        ReassemblyUtils.synchronizedRelease(this, previousState);
        int streamId = this.streamId;
        this.requesterResponderSupport.remove(streamId, this);
        ByteBuf errorFrame = ErrorFrameCodec.encode(this.allocator, streamId, t);
        this.connection.sendFrame(streamId, errorFrame);
        if (!StateUtils.isInboundTerminated(previousState)) {
            RequestChannelRequesterFlux requestChannelRequesterFlux = this;
            synchronized (requestChannelRequesterFlux) {
                RequestInterceptor interceptor = this.requestInterceptor;
                if (interceptor != null) {
                    interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
                }
                this.inboundDone = true;
                this.inboundSubscriber.onError(t);
            }
        } else {
            Operators.onErrorDropped((Throwable)t, (Context)this.inboundSubscriber.currentContext());
        }
    }

    public void onComplete() {
        RequestInterceptor interceptor;
        if (this.outboundDone) {
            return;
        }
        this.outboundDone = true;
        long previousState = StateUtils.markOutboundTerminated(STATE, this, true);
        if (StateUtils.isTerminated(previousState) || StateUtils.isOutboundTerminated(previousState)) {
            return;
        }
        if (!StateUtils.isFirstFrameSent(previousState)) {
            this.inboundSubscriber.onError((Throwable)new CancellationException("Empty Source"));
            return;
        }
        int streamId = this.streamId;
        boolean isInboundTerminated = StateUtils.isInboundTerminated(previousState);
        if (isInboundTerminated) {
            this.requesterResponderSupport.remove(streamId, this);
        }
        ByteBuf completeFrame = PayloadFrameCodec.encodeComplete(this.allocator, streamId);
        this.connection.sendFrame(streamId, completeFrame);
        if (isInboundTerminated && (interceptor = this.requestInterceptor) != null) {
            interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, null);
        }
    }

    @Override
    public final void handleComplete() {
        if (this.inboundDone) {
            return;
        }
        this.inboundDone = true;
        long previousState = StateUtils.markInboundTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            return;
        }
        if (StateUtils.isOutboundTerminated(previousState)) {
            this.requesterResponderSupport.remove(this.streamId, this);
            RequestInterceptor interceptor = this.requestInterceptor;
            if (interceptor != null) {
                interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, null);
            }
        }
        this.inboundSubscriber.onComplete();
    }

    @Override
    public final void handleError(Throwable cause) {
        if (this.inboundDone) {
            Operators.onErrorDropped((Throwable)cause, (Context)this.inboundSubscriber.currentContext());
            return;
        }
        this.inboundDone = true;
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            Operators.onErrorDropped((Throwable)cause, (Context)this.inboundSubscriber.currentContext());
            return;
        }
        if (StateUtils.isInboundTerminated(previousState)) {
            RequestInterceptor interceptor = this.requestInterceptor;
            if (interceptor != null) {
                interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, cause);
            }
            Operators.onErrorDropped((Throwable)cause, (Context)this.inboundSubscriber.currentContext());
            return;
        }
        ReassemblyUtils.release(this, previousState);
        int streamId = this.streamId;
        this.requesterResponderSupport.remove(streamId, this);
        this.outboundSubscription.cancel();
        RequestInterceptor interceptor = this.requestInterceptor;
        if (interceptor != null) {
            interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, cause);
        }
        this.inboundSubscriber.onError(cause);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void handlePayload(Payload value) {
        RequestChannelRequesterFlux requestChannelRequesterFlux = this;
        synchronized (requestChannelRequesterFlux) {
            if (this.inboundDone) {
                value.release();
                return;
            }
            this.inboundSubscriber.onNext((Object)value);
        }
    }

    @Override
    public void handleRequestN(long n) {
        this.outboundSubscription.request(n);
    }

    @Override
    public void handleCancel() {
        RequestInterceptor interceptor;
        if (this.outboundDone) {
            return;
        }
        long previousState = StateUtils.markOutboundTerminated(STATE, this, false);
        if (StateUtils.isTerminated(previousState) || StateUtils.isOutboundTerminated(previousState)) {
            return;
        }
        boolean inboundTerminated = StateUtils.isInboundTerminated(previousState);
        if (inboundTerminated) {
            this.requesterResponderSupport.remove(this.streamId, this);
        }
        this.outboundSubscription.cancel();
        if (inboundTerminated && (interceptor = this.requestInterceptor) != null) {
            interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, null);
        }
    }

    @Override
    public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload) {
        ReassemblyUtils.handleNextSupport(STATE, this, this, this.inboundSubscriber, this.payloadDecoder, this.allocator, this.maxInboundPayloadSize, frame, hasFollows, isLastPayload);
    }

    @NonNull
    public Context currentContext() {
        long state = this.state;
        if (StateUtils.isSubscribedOrTerminated(state)) {
            Context contextWithDiscard;
            this.cachedContext = contextWithDiscard = this.inboundSubscriber.currentContext().putAll(SendUtils.DISCARD_CONTEXT);
            return contextWithDiscard;
        }
        return Context.empty();
    }

    @Override
    public CompositeByteBuf getFrames() {
        return this.frames;
    }

    @Override
    public void setFrames(CompositeByteBuf byteBuf) {
        this.frames = byteBuf;
    }

    @Nullable
    public Object scanUnsafe(Scannable.Attr key) {
        long state = this.state;
        if (key == Scannable.Attr.TERMINATED) {
            return StateUtils.isTerminated(state);
        }
        if (key == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM) {
            return state;
        }
        return null;
    }

    @NonNull
    public String stepName() {
        return "source(RequestChannelFlux)";
    }
}

