/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.DecoderResultProvider;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.Operators;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyPipeline;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

final class ChannelOperationsHandler
extends ChannelDuplexHandler
implements NettyPipeline.SendOptions,
ChannelFutureListener {
    final PublisherSender inner = new PublisherSender(this);
    final int prefetch;
    final ConnectionObserver listener;
    final ChannelOperations.OnSetup opsFactory;
    BiPredicate<ChannelFuture, Object> pendingWriteOffer;
    Queue<?> pendingWrites;
    ChannelHandlerContext ctx;
    boolean flushOnEach;
    boolean flushOnEachWithEventLoop;
    long pendingBytes;
    private Channel.Unsafe unsafe;
    volatile boolean innerActive;
    volatile boolean removed;
    volatile int wip;
    volatile long scheduledFlush;
    static final AtomicIntegerFieldUpdater<ChannelOperationsHandler> WIP = AtomicIntegerFieldUpdater.newUpdater(ChannelOperationsHandler.class, "wip");
    static final AtomicLongFieldUpdater<ChannelOperationsHandler> SCHEDULED_FLUSH = AtomicLongFieldUpdater.newUpdater(ChannelOperationsHandler.class, "scheduledFlush");
    static final Logger log = Loggers.getLogger(ChannelOperationsHandler.class);
    static final BiConsumer<?, ? super ByteBuf> NOOP_ENCODER = (a, b) -> {};

    ChannelOperationsHandler(ChannelOperations.OnSetup opsFactory, ConnectionObserver listener) {
        this.prefetch = 32;
        this.listener = listener;
        this.opsFactory = opsFactory;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        Connection c = Connection.from(ctx.channel());
        this.listener.onStateChange(c, ConnectionObserver.State.CONNECTED);
        ChannelOperations<?, ?> ops = this.opsFactory.create(c, this.listener, null);
        if (ops != null) {
            ops.bind();
            this.listener.onStateChange(ops, ConnectionObserver.State.CONFIGURED);
        }
    }

    @Override
    public final void channelInactive(ChannelHandlerContext ctx) {
        try {
            ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
            if (ops != null) {
                ops.onInboundClose();
            } else {
                this.listener.onStateChange(Connection.from(ctx.channel()), ConnectionObserver.State.DISCONNECTING);
            }
        }
        catch (Throwable err) {
            Exceptions.throwIfFatal(err);
            this.exceptionCaught(ctx, err);
        }
    }

    @Override
    public final void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
            return;
        }
        try {
            ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
            if (ops != null) {
                ops.onInboundNext(ctx, msg);
            } else {
                if (log.isDebugEnabled()) {
                    DecoderResult decoderResult;
                    String loggingMsg = msg.toString();
                    if (msg instanceof DecoderResultProvider && (decoderResult = ((DecoderResultProvider)msg).decoderResult()).isFailure()) {
                        log.debug(ReactorNetty.format(ctx.channel(), "Decoding failed: " + msg + " : "), decoderResult.cause());
                    }
                    if (msg instanceof ByteBufHolder && ((ByteBufHolder)msg).content() != Unpooled.EMPTY_BUFFER) {
                        loggingMsg = ((ByteBufHolder)msg).content().toString(Charset.defaultCharset());
                    }
                    log.debug(ReactorNetty.format(ctx.channel(), "No ChannelOperation attached. Dropping: {}"), loggingMsg);
                }
                ReferenceCountUtil.release(msg);
            }
        }
        catch (Throwable err) {
            Exceptions.throwIfFatal(err);
            this.exceptionCaught(ctx, err);
            ReferenceCountUtil.safeRelease(msg);
        }
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(ctx.channel(), "Write state change {}"), ctx.channel().isWritable());
        }
        this.drain();
    }

    @Override
    public final void exceptionCaught(ChannelHandlerContext ctx, Throwable err) {
        Exceptions.throwIfJvmFatal(err);
        ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
        if (ops != null) {
            ops.onInboundError(err);
        } else {
            this.listener.onUncaughtException(Connection.from(ctx.channel()), err);
        }
    }

    @Override
    public void flush(ChannelHandlerContext ctx) {
        this.drain();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        this.ctx = ctx;
        this.unsafe = ctx.channel().unsafe();
        this.inner.request(this.prefetch);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        if (!this.removed) {
            this.removed = true;
            this.inner.cancel();
            this.drain();
        }
    }

    @Override
    public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (log.isTraceEnabled()) {
            log.trace(ReactorNetty.format(ctx.channel(), "End of the pipeline, User event {}"), evt);
        }
        if (evt instanceof NettyPipeline.SendOptionsChangeEvent) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(ctx.channel(), "New sending options"));
            }
            ((NettyPipeline.SendOptionsChangeEvent)evt).configurator().accept(this);
            return;
        }
        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (log.isDebugEnabled() && msg != ChannelOperations.TERMINATED_OPS) {
            log.debug(ReactorNetty.format(ctx.channel(), "Writing object {}"), msg);
        }
        if (this.pendingWrites == null) {
            this.pendingWrites = Queues.unbounded().get();
            this.pendingWriteOffer = (BiPredicate)((Object)this.pendingWrites);
        }
        if (!this.pendingWriteOffer.test(promise, msg)) {
            promise.setFailure(new IllegalStateException("Send Queue full?!"));
        }
    }

    @Override
    public NettyPipeline.SendOptions flushOnBoundary() {
        this.flushOnEach = false;
        return this;
    }

    @Override
    public NettyPipeline.SendOptions flushOnEach(boolean withEventLoop) {
        this.flushOnEach = true;
        this.flushOnEachWithEventLoop = withEventLoop;
        return this;
    }

    @Override
    public void operationComplete(ChannelFuture future) {
        if (future.isSuccess()) {
            this.inner.request(1L);
        }
    }

    ChannelFuture doWrite(Object msg, ChannelPromise promise, @Nullable PublisherSender inner) {
        if (this.flushOnEach || inner == null && this.pendingWrites.isEmpty() || !this.ctx.channel().isWritable()) {
            this.pendingBytes = 0L;
            ChannelFuture future = this.ctx.write(msg, promise);
            if (this.flushOnEachWithEventLoop && this.ctx.channel().isWritable()) {
                this.scheduleFlush();
            } else {
                this.ctx.flush();
            }
            return future;
        }
        if (msg instanceof ByteBuf) {
            this.pendingBytes = Operators.addCap(this.pendingBytes, ((ByteBuf)msg).readableBytes());
        } else if (msg instanceof ByteBufHolder) {
            this.pendingBytes = Operators.addCap(this.pendingBytes, ((ByteBufHolder)msg).content().readableBytes());
        } else if (msg instanceof FileRegion) {
            this.pendingBytes = Operators.addCap(this.pendingBytes, ((FileRegion)msg).count());
        }
        if (log.isTraceEnabled()) {
            log.trace(ReactorNetty.format(this.ctx.channel(), "Pending write size = {}"), this.pendingBytes);
        }
        ChannelFuture future = this.ctx.write(msg, promise);
        if (!this.ctx.channel().isWritable()) {
            this.pendingBytes = 0L;
            this.ctx.flush();
        }
        return future;
    }

    void scheduleFlush() {
        if (SCHEDULED_FLUSH.getAndIncrement(this) == 0L) {
            this.ctx.channel().eventLoop().execute(() -> {
                long missed = this.scheduledFlush;
                do {
                    if (!this.hasPendingWriteBytes()) continue;
                    this.ctx.flush();
                } while ((missed = SCHEDULED_FLUSH.addAndGet(this, -missed)) != 0L);
            });
        }
    }

    void discard() {
        while (this.pendingWrites != null && !this.pendingWrites.isEmpty()) {
            ChannelPromise promise;
            Object v = this.pendingWrites.poll();
            try {
                promise = (ChannelPromise)v;
            }
            catch (Throwable e) {
                this.ctx.fireExceptionCaught(e);
                return;
            }
            v = this.pendingWrites.poll();
            if (v == ChannelOperations.TERMINATED_OPS) continue;
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(this.ctx.channel(), "Terminated ChannelOperation. Dropping Pending Write: {}"), v);
            }
            ReferenceCountUtil.release(v);
            promise.tryFailure(new AbortedException("Connection has been closed"));
        }
        return;
    }

    void drain() {
        if (WIP.getAndIncrement(this) == 0) {
            while (true) {
                boolean empty;
                ChannelFuture future;
                if (this.removed) {
                    this.discard();
                    return;
                }
                if (this.pendingWrites == null || this.innerActive || !this.ctx.channel().isWritable()) {
                    if (!this.ctx.channel().isWritable() && this.hasPendingWriteBytes()) {
                        this.ctx.flush();
                    }
                    if (WIP.decrementAndGet(this) != 0) continue;
                    break;
                }
                Object v = this.pendingWrites.poll();
                try {
                    future = (ChannelFuture)v;
                }
                catch (Throwable e) {
                    this.ctx.fireExceptionCaught(e);
                    return;
                }
                boolean bl = empty = future == null;
                if (empty) {
                    if (WIP.decrementAndGet(this) != 0) continue;
                    break;
                }
                v = this.pendingWrites.poll();
                if (!this.innerActive && v == PublisherSender.PENDING_WRITES) {
                    boolean last = this.pendingWrites.isEmpty();
                    if (!future.isDone() && this.hasPendingWriteBytes()) {
                        this.ctx.flush();
                        if (!future.isDone() && this.hasPendingWriteBytes() && !this.pendingWriteOffer.test(future, v) && future instanceof ChannelPromise) {
                            ((ChannelPromise)future).setFailure(new IllegalStateException("Send Queue full?!"));
                        }
                    }
                    if (!last || WIP.decrementAndGet(this) != 0) continue;
                    break;
                }
                if (!(future instanceof ChannelPromise)) continue;
                ChannelPromise promise = (ChannelPromise)future;
                if (v == ChannelOperations.TERMINATED_OPS) {
                    promise.trySuccess();
                    continue;
                }
                if (v instanceof Publisher) {
                    Publisher p = (Publisher)v;
                    if (p instanceof Callable) {
                        Object vr;
                        Callable supplier = (Callable)((Object)p);
                        try {
                            vr = supplier.call();
                        }
                        catch (Throwable e) {
                            promise.setFailure(e);
                            continue;
                        }
                        if (vr == null) {
                            promise.setSuccess();
                            continue;
                        }
                        if (this.inner.unbounded) {
                            this.doWrite(vr, promise, null);
                            continue;
                        }
                        this.innerActive = true;
                        this.inner.promise = promise;
                        this.inner.onSubscribe(Operators.scalarSubscription(this.inner, vr));
                        continue;
                    }
                    this.innerActive = true;
                    this.inner.promise = promise;
                    p.subscribe(this.inner);
                    continue;
                }
                this.doWrite(v, promise, null);
            }
        }
    }

    private boolean hasPendingWriteBytes() {
        ChannelOutboundBuffer outBuffer = this.unsafe.outboundBuffer();
        return outBuffer != null && outBuffer.totalPendingWriteBytes() > 0L;
    }

    private static final class PendingWritesOnCompletion {
        private PendingWritesOnCompletion() {
        }

        public String toString() {
            return "[Pending Writes on Completion]";
        }
    }

    static final class PublisherSender
    implements CoreSubscriber<Object>,
    Subscription,
    ChannelFutureListener,
    Consumer<ChannelFuture> {
        final ChannelOperationsHandler parent;
        volatile Subscription missedSubscription;
        volatile long missedRequested;
        volatile long missedProduced;
        volatile int wip;
        boolean inactive;
        long requested;
        boolean unbounded;
        Subscription actual;
        long produced;
        ChannelPromise promise;
        ChannelFuture lastWrite;
        boolean lastThreadInEventLoop;
        static final AtomicReferenceFieldUpdater<PublisherSender, Subscription> MISSED_SUBSCRIPTION = AtomicReferenceFieldUpdater.newUpdater(PublisherSender.class, Subscription.class, "missedSubscription");
        static final AtomicLongFieldUpdater<PublisherSender> MISSED_REQUESTED = AtomicLongFieldUpdater.newUpdater(PublisherSender.class, "missedRequested");
        static final AtomicLongFieldUpdater<PublisherSender> MISSED_PRODUCED = AtomicLongFieldUpdater.newUpdater(PublisherSender.class, "missedProduced");
        static final AtomicIntegerFieldUpdater<PublisherSender> WIP = AtomicIntegerFieldUpdater.newUpdater(PublisherSender.class, "wip");
        private static final PendingWritesOnCompletion PENDING_WRITES = new PendingWritesOnCompletion();

        PublisherSender(ChannelOperationsHandler parent) {
            this.parent = parent;
        }

        @Override
        public void accept(ChannelFuture cf) {
            if (this.promise == cf && MISSED_SUBSCRIPTION.compareAndSet(this, null, Operators.cancelledSubscription())) {
                this.drain();
            }
        }

        @Override
        public Context currentContext() {
            ChannelPromise p = this.promise;
            if (p instanceof Function) {
                return (Context)((Function)((Object)p)).apply(null);
            }
            return Context.empty();
        }

        @Override
        public final void cancel() {
            if (!this.inactive) {
                this.inactive = true;
                this.drain();
            }
        }

        @Override
        public void onComplete() {
            long p = this.produced;
            ChannelFuture f = this.lastWrite;
            this.parent.innerActive = false;
            if (p != 0L && (this.parent.pendingBytes > 0L || this.parent.hasPendingWriteBytes() || !this.lastThreadInEventLoop)) {
                if (this.parent.ctx.channel().isActive()) {
                    this.parent.pendingBytes = 0L;
                    if (this.lastThreadInEventLoop) {
                        this.parent.ctx.flush();
                    } else {
                        this.parent.ctx.channel().eventLoop().execute(() -> this.parent.ctx.flush());
                    }
                } else {
                    this.promise.setFailure(new AbortedException("Connection has been closed"));
                    return;
                }
            }
            if (f != null) {
                if (!(f.isDone() || !this.parent.hasPendingWriteBytes() && this.lastThreadInEventLoop)) {
                    EventLoop eventLoop = this.parent.ctx.channel().eventLoop();
                    if (eventLoop.inEventLoop()) {
                        if (!this.parent.pendingWriteOffer.test(f, PENDING_WRITES) && f instanceof ChannelPromise) {
                            ((ChannelPromise)f).setFailure(new IllegalStateException("Send Queue full?!"));
                        }
                    } else {
                        eventLoop.execute(() -> {
                            if (!f.isDone() && this.parent.hasPendingWriteBytes() && !this.parent.pendingWriteOffer.test(f, PENDING_WRITES) && f instanceof ChannelPromise) {
                                ((ChannelPromise)f).setFailure(new IllegalStateException("Send Queue full?!"));
                            }
                        });
                    }
                }
                f.addListener(this);
            } else {
                this.produced = 0L;
                this.produced(p);
                this.promise.setSuccess();
                this.parent.drain();
            }
        }

        @Override
        public void onError(Throwable t) {
            long p = this.produced;
            ChannelFuture f = this.lastWrite;
            this.parent.innerActive = false;
            if (p != 0L) {
                if (this.parent.ctx.channel().isActive()) {
                    if (this.lastThreadInEventLoop) {
                        this.parent.ctx.flush();
                    } else {
                        this.parent.ctx.channel().eventLoop().execute(() -> this.parent.ctx.flush());
                    }
                } else {
                    this.promise.setFailure(new AbortedException("Connection has been closed"));
                    return;
                }
            }
            if (f != null) {
                if (!(f.isDone() || !this.parent.hasPendingWriteBytes() && this.lastThreadInEventLoop)) {
                    EventLoop eventLoop = this.parent.ctx.channel().eventLoop();
                    if (eventLoop.inEventLoop()) {
                        if (!this.parent.pendingWriteOffer.test(f, PENDING_WRITES) && f instanceof ChannelPromise) {
                            ((ChannelPromise)f).setFailure(new IllegalStateException("Send Queue full?!"));
                        }
                    } else {
                        eventLoop.execute(() -> {
                            if (!this.parent.pendingWriteOffer.test(f, PENDING_WRITES) && f instanceof ChannelPromise) {
                                ((ChannelPromise)f).setFailure(new IllegalStateException("Send Queue full?!"));
                            }
                        });
                    }
                }
                f.addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
                    this.produced = 0L;
                    this.produced(p);
                    if (!future.isSuccess()) {
                        this.promise.setFailure(Exceptions.addSuppressed(future.cause(), t));
                        return;
                    }
                    this.promise.setFailure(t);
                }));
            } else {
                this.produced = 0L;
                this.produced(p);
                this.promise.setFailure(t);
                this.parent.drain();
            }
        }

        @Override
        public void onNext(Object t) {
            ChannelPromise newPromise = this.parent.ctx.newPromise();
            if (this.lastWrite == null || this.lastThreadInEventLoop || this.lastWrite.isDone()) {
                this.onNextInternal(t, newPromise);
                this.lastThreadInEventLoop = this.parent.ctx.channel().eventLoop().inEventLoop();
            } else {
                this.parent.ctx.channel().eventLoop().execute(() -> this.onNextInternal(t, newPromise));
                this.lastThreadInEventLoop = false;
            }
            this.lastWrite = newPromise;
        }

        private void onNextInternal(Object t, ChannelPromise promise) {
            ++this.produced;
            this.parent.doWrite(t, promise, this);
            if (this.parent.ctx.channel().isWritable()) {
                this.request(1L);
            } else {
                promise.addListener(this.parent);
            }
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (this.inactive) {
                s.cancel();
                return;
            }
            Objects.requireNonNull(s);
            if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                this.actual = s;
                long r = this.requested;
                if (WIP.decrementAndGet(this) != 0) {
                    this.drainLoop();
                }
                if (r != 0L) {
                    s.request(r);
                }
                return;
            }
            MISSED_SUBSCRIPTION.set(this, s);
            this.drain();
        }

        @Override
        public void operationComplete(ChannelFuture future) {
            long p = this.produced;
            this.produced = 0L;
            this.produced(p);
            if (future.isSuccess()) {
                this.promise.setSuccess();
            } else {
                this.promise.setFailure(future.cause());
            }
        }

        @Override
        public final void request(long n) {
            if (Operators.validate(n)) {
                if (this.unbounded) {
                    return;
                }
                if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                    long r = this.requested;
                    if (r != Long.MAX_VALUE) {
                        this.requested = r = Operators.addCap(r, n);
                        if (r == Long.MAX_VALUE) {
                            this.unbounded = true;
                        }
                    }
                    Subscription a = this.actual;
                    if (WIP.decrementAndGet(this) != 0) {
                        this.drainLoop();
                    }
                    if (a != null) {
                        a.request(n);
                    }
                    return;
                }
                Operators.addCap(MISSED_REQUESTED, this, n);
                this.drain();
            }
        }

        final void drain() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }
            this.drainLoop();
        }

        final void drainLoop() {
            int missed = 1;
            long requestAmount = 0L;
            Subscription requestTarget = null;
            do {
                long mp;
                long mr;
                Subscription ms;
                if ((ms = this.missedSubscription) != null && (ms = (Subscription)MISSED_SUBSCRIPTION.getAndSet(this, null)) == Operators.cancelledSubscription()) {
                    this.parent.innerActive = false;
                    Subscription a = this.actual;
                    if (a != null) {
                        a.cancel();
                        this.actual = null;
                        this.promise = this.parent.ctx.voidPromise();
                    }
                }
                if ((mr = this.missedRequested) != 0L) {
                    mr = MISSED_REQUESTED.getAndSet(this, 0L);
                }
                if ((mp = this.missedProduced) != 0L) {
                    mp = MISSED_PRODUCED.getAndSet(this, 0L);
                }
                Subscription a = this.actual;
                if (this.inactive) {
                    if (a != null) {
                        a.cancel();
                        this.actual = null;
                        this.promise = this.parent.ctx.voidPromise();
                    }
                    if (ms == null) continue;
                    ms.cancel();
                    continue;
                }
                long r = this.requested;
                if (r != Long.MAX_VALUE) {
                    long u = Operators.addCap(r, mr);
                    if (u != Long.MAX_VALUE) {
                        long v = u - mp;
                        if (v < 0L) {
                            Operators.reportMoreProduced();
                            v = 0L;
                        }
                        r = v;
                    } else {
                        r = u;
                    }
                    this.requested = r;
                }
                if (ms != null) {
                    this.actual = ms;
                    if (r == 0L) continue;
                    requestAmount = Operators.addCap(requestAmount, r);
                    requestTarget = ms;
                    continue;
                }
                if (mr == 0L || a == null) continue;
                requestAmount = Operators.addCap(requestAmount, mr);
                requestTarget = a;
            } while ((missed = WIP.addAndGet(this, -missed)) != 0);
            if (requestAmount != 0L) {
                requestTarget.request(requestAmount);
            }
        }

        final void produced(long n) {
            if (this.unbounded) {
                return;
            }
            if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                long r = this.requested;
                if (r != Long.MAX_VALUE) {
                    long u = r - n;
                    if (u < 0L) {
                        Operators.reportMoreProduced();
                        u = 0L;
                    }
                    this.requested = u;
                } else {
                    this.unbounded = true;
                }
                if (WIP.decrementAndGet(this) == 0) {
                    return;
                }
                this.drainLoop();
                return;
            }
            Operators.addCap(MISSED_PRODUCED, this, n);
            this.drain();
        }
    }
}

