/*
 * Decompiled with CFR 0.152.
 */
package ratpack.core.server.internal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import ratpack.core.server.internal.ResponseBodyWriter;
import ratpack.core.server.internal.ResponseWritingListener;

class StreamingResponseBodyWriter
implements ResponseBodyWriter,
ResponseWritingListener {
    private final Publisher<? extends ByteBuf> publisher;
    private boolean done;
    private Subscription subscription;
    private ChannelPromise channelPromise;

    public StreamingResponseBodyWriter(Publisher<? extends ByteBuf> publisher) {
        this.publisher = publisher;
    }

    @Override
    public void onClosed() {
        if (!this.done) {
            this.done = true;
            if (this.subscription != null) {
                this.subscription.cancel();
            }
            this.channelPromise.setSuccess();
        }
    }

    @Override
    public void onWritable() {
        if (!this.done && this.subscription != null) {
            this.subscription.request(1L);
        }
    }

    public ChannelPromise write(Channel channel) {
        this.channelPromise = channel.newPromise();
        this.publisher.subscribe((org.reactivestreams.Subscriber)new Subscriber(channel));
        return this.channelPromise;
    }

    private class Subscriber
    implements org.reactivestreams.Subscriber<ByteBuf> {
        private final Channel channel;

        public Subscriber(Channel channel) {
            this.channel = channel;
        }

        public void onSubscribe(Subscription incomingSubscription) {
            if (incomingSubscription == null) {
                throw new NullPointerException("'subscription' is null");
            }
            if (StreamingResponseBodyWriter.this.subscription != null) {
                incomingSubscription.cancel();
                return;
            }
            StreamingResponseBodyWriter.this.subscription = incomingSubscription;
            if (this.channel.isWritable()) {
                StreamingResponseBodyWriter.this.subscription.request(1L);
            }
        }

        public void onNext(ByteBuf o) {
            o.touch();
            if (o.readableBytes() == 0) {
                o.release();
                StreamingResponseBodyWriter.this.subscription.request(1L);
                return;
            }
            if (StreamingResponseBodyWriter.this.done) {
                o.release();
                return;
            }
            this.channel.writeAndFlush((Object)new DefaultHttpContent(o.touch()), this.channel.voidPromise());
            if (this.channel.isWritable()) {
                StreamingResponseBodyWriter.this.subscription.request(1L);
            }
        }

        public void onError(Throwable t) {
            if (t == null) {
                throw new NullPointerException("error is null");
            }
            if (!StreamingResponseBodyWriter.this.done) {
                StreamingResponseBodyWriter.this.done = true;
                StreamingResponseBodyWriter.this.channelPromise.setFailure(t);
            }
        }

        public void onComplete() {
            if (!StreamingResponseBodyWriter.this.done) {
                StreamingResponseBodyWriter.this.done = true;
                this.channel.write((Object)LastHttpContent.EMPTY_LAST_CONTENT, StreamingResponseBodyWriter.this.channelPromise);
                this.channel.flush();
            }
        }
    }
}

