/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.client.netty;

import io.micronaut.buffer.netty.NettyReadBufferFactory;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.io.buffer.ReadBuffer;
import io.micronaut.http.body.stream.BufferConsumer;
import io.micronaut.http.body.stream.LazyUpstream;
import io.micronaut.http.netty.EventLoopFlow;
import io.micronaut.http.netty.body.StreamingNettyByteBody;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.OrderedEventExecutor;
import java.util.function.Consumer;

@Internal
final class StreamWriter
extends ChannelInboundHandlerAdapter
implements BufferConsumer {
    private final StreamingNettyByteBody body;
    private final Consumer<Throwable> errorHandler;
    private ChannelHandlerContext ctx;
    private EventLoopFlow flow;
    private BufferConsumer.Upstream upstream;
    private long unwritten = 0L;
    private boolean completed = false;

    StreamWriter(StreamingNettyByteBody body, Consumer<Throwable> errorHandler) {
        this.body = body;
        this.errorHandler = errorHandler;
    }

    void startWriting() {
        if (this.ctx == null) {
            throw new IllegalStateException("Not added to a channel yet");
        }
        LazyUpstream lazyUpstream = new LazyUpstream();
        this.upstream = lazyUpstream;
        this.upstream = this.body.primary((BufferConsumer)this);
        lazyUpstream.forward(this.upstream);
        try {
            this.upstream.start();
        }
        catch (Exception e) {
            this.errorHandler.accept(e);
        }
    }

    void cancel() {
        if (this.upstream != null) {
            this.upstream.allowDiscard();
            this.upstream.disregardBackpressure();
        }
        this.body.close();
    }

    boolean isCompleted() {
        return this.completed;
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.ctx = ctx;
        this.flow = new EventLoopFlow((OrderedEventExecutor)ctx.channel().eventLoop());
    }

    public void add(ReadBuffer buf) {
        if (this.flow.executeNow(() -> this.add0(buf))) {
            this.add0(buf);
        }
    }

    private void add0(ReadBuffer buf) {
        if (this.ctx == null) {
            buf.close();
            return;
        }
        int readable = buf.readable();
        this.ctx.writeAndFlush((Object)new DefaultHttpContent(NettyReadBufferFactory.toByteBuf((ReadBuffer)buf))).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            assert (this.ctx.executor().inEventLoop());
            if (future.isSuccess()) {
                if (this.ctx.channel().isWritable()) {
                    this.upstream.onBytesConsumed((long)readable);
                } else {
                    this.unwritten += (long)readable;
                }
            } else {
                this.error(future.cause());
            }
        }));
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        long unwritten = this.unwritten;
        if (ctx.channel().isWritable() && unwritten != 0L) {
            this.unwritten = 0L;
            this.upstream.onBytesConsumed(unwritten);
        }
        super.channelWritabilityChanged(ctx);
    }

    public void complete() {
        if (this.flow.executeNow(this::complete0)) {
            this.complete0();
        }
    }

    private void complete0() {
        if (this.ctx == null) {
            return;
        }
        this.ctx.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT, this.ctx.voidPromise());
        this.completed = true;
    }

    public void discard() {
    }

    public void error(Throwable e) {
        if (this.ctx == null) {
            return;
        }
        this.errorHandler.accept(e);
    }

    public void handlerRemoved(ChannelHandlerContext ctx) {
        this.cancel();
    }
}

