/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.handler;

import io.micronaut.core.annotation.Internal;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
abstract class BlockingWriter {
    static final int QUEUE_SIZE = 2;
    static final int CHUNK_SIZE = 65536;
    private static final Logger LOG = LoggerFactory.getLogger(BlockingWriter.class);
    private final ByteBufAllocator alloc;
    private final InputStream stream;
    private final ExecutorService blockingExecutor;
    private final Queue<ByteBuf> queue = new ArrayDeque<ByteBuf>(2);
    private Future<?> worker = null;
    private boolean workerReady = false;
    private boolean discard = false;
    private boolean done = false;
    private boolean producerWaiting = false;
    private boolean consumerWaiting = false;

    BlockingWriter(ByteBufAllocator alloc, InputStream stream, ExecutorService blockingExecutor) {
        this.alloc = alloc;
        this.stream = stream;
        this.blockingExecutor = blockingExecutor;
    }

    protected abstract void writeStart();

    protected abstract boolean writeData(ByteBuf var1);

    protected abstract void writeLast();

    protected abstract void writeSomeAsync();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    final void writeSome() {
        ByteBuf msg;
        if (this.worker == null) {
            this.writeStart();
            this.worker = this.blockingExecutor.submit(this::work);
        }
        do {
            BlockingWriter blockingWriter = this;
            synchronized (blockingWriter) {
                if (this.producerWaiting) {
                    this.producerWaiting = false;
                    this.notifyAll();
                }
                if ((msg = this.queue.poll()) == null && !this.done) {
                    this.consumerWaiting = true;
                    break;
                }
            }
            if (msg != null) continue;
            this.writeLast();
            break;
        } while (this.writeData(msg));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void discard() {
        this.discard = true;
        if (this.worker == null) {
            this.worker = this.blockingExecutor.submit(this::work);
        } else {
            BlockingWriter blockingWriter = this;
            synchronized (blockingWriter) {
                if (this.workerReady) {
                    this.worker.cancel(true);
                    this.drain();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void work() {
        ReferenceCounted buf = null;
        try (InputStream stream2 = this.stream;){
            BlockingWriter blockingWriter = this;
            synchronized (blockingWriter) {
                this.workerReady = true;
                if (this.discard) {
                    return;
                }
            }
            while (true) {
                buf = this.alloc.heapBuffer(65536);
                int n = ((ByteBuf)buf).writeBytes(stream2, 65536);
                BlockingWriter blockingWriter2 = this;
                synchronized (blockingWriter2) {
                    if (n == -1) {
                        this.done = true;
                        this.wakeConsumer();
                        return;
                    }
                    while (this.queue.size() >= 2 && !this.discard) {
                        this.producerWaiting = true;
                        this.wait();
                    }
                    if (this.discard) {
                        return;
                    }
                    this.queue.add((ByteBuf)buf);
                    buf = null;
                    this.wakeConsumer();
                }
            }
        }
        catch (InterruptedIOException | InterruptedException stream2) {
            return;
        }
        catch (Exception e) {
            if (!LOG.isWarnEnabled()) return;
            LOG.warn("InputStream threw an error during read. This error cannot be forwarded to the client. Please make sure any errors are thrown by the controller instead.", e);
            return;
        }
        finally {
            if (buf != null) {
                buf.release();
            }
            BlockingWriter blockingWriter = this;
            synchronized (blockingWriter) {
                this.done = true;
                if (this.discard) {
                    this.drain();
                }
            }
        }
    }

    private void wakeConsumer() {
        assert (Thread.holdsLock(this));
        if (!this.discard && this.consumerWaiting) {
            this.consumerWaiting = false;
            this.writeSomeAsync();
        }
    }

    private void drain() {
        ByteBuf buf;
        assert (Thread.holdsLock(this));
        while ((buf = this.queue.poll()) != null) {
            buf.release();
        }
    }
}

