/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.rest.stream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

public abstract class CacheChunkedStream<T> {
    protected static final Log logger = LogFactory.getLog(CacheChunkedStream.class);
    private static final int CHUNK_SIZE = 8192;
    protected final Publisher<T> publisher;

    public CacheChunkedStream(Publisher<T> publisher) {
        this.publisher = publisher;
    }

    public static byte[] readContentAsBytes(Object content) {
        if (content instanceof byte[]) {
            return (byte[])content;
        }
        if (content instanceof WrappedByteArray) {
            return ((WrappedByteArray)content).getBytes();
        }
        return content.toString().getBytes(StandardCharsets.UTF_8);
    }

    public abstract void subscribe(ChannelHandlerContext var1);

    static abstract class ByteBufSubscriber<T>
    extends DefaultSubscriber<T> {
        protected final ChannelHandlerContext ctx;
        protected final ByteBufAllocator allocator;
        protected final GenericFutureListener<Future<Void>> ERROR_LISTENER = f -> {
            try {
                f.get();
            }
            catch (Throwable t) {
                this.onError(t);
            }
        };
        private boolean firstEntry = true;
        private ByteBuf pendingBuffer;

        protected ByteBufSubscriber(ChannelHandlerContext ctx, ByteBufAllocator allocator) {
            this.ctx = Objects.requireNonNull(ctx);
            this.allocator = Objects.requireNonNull(allocator);
        }

        protected ByteBuf newByteBuf() {
            return this.allocator.buffer(2048);
        }

        protected void onStart() {
            ByteBuf buf = this.newByteBuf();
            buf.writeByte(91);
            this.pendingBuffer = buf;
            this.request(1L);
        }

        public void onNext(T item) {
            ByteBuf pendingBuf = this.pendingBuffer;
            if (!this.firstEntry) {
                pendingBuf.writeByte(44);
            } else {
                this.firstEntry = false;
            }
            this.writeItem(item, pendingBuf);
            if (pendingBuf.writerIndex() > 8192) {
                this.writeToContext(pendingBuf, false).addListener(f -> {
                    try {
                        f.get();
                        this.request(1L);
                    }
                    catch (Throwable t) {
                        this.onError(t);
                    }
                });
                this.pendingBuffer = this.newByteBuf();
            } else {
                assert (pendingBuf.writableBytes() > 0);
                this.request(1L);
            }
        }

        abstract void writeItem(T var1, ByteBuf var2);

        public void onError(Throwable t) {
            logger.error((Object)"Error encountered while streaming cache chunk", t);
            if (this.pendingBuffer != null) {
                this.pendingBuffer.release();
                this.pendingBuffer = null;
            }
            this.cancel();
            this.ctx.close();
        }

        public void onComplete() {
            ByteBuf buf = this.pendingBuffer;
            buf.writeByte(93);
            this.writeToContext(buf, true).addListener(this.ERROR_LISTENER);
            this.pendingBuffer = null;
        }

        ChannelFuture writeToContext(ByteBuf buf, boolean isComplete) {
            ChannelFuture completeFuture = this.ctx.write((Object)new DefaultHttpContent(buf));
            if (isComplete) {
                completeFuture = this.ctx.write((Object)LastHttpContent.EMPTY_LAST_CONTENT);
            }
            this.ctx.flush();
            return completeFuture;
        }
    }
}

