/*
 * Decompiled with CFR 0.152.
 */
package ratpack.core.sse;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.core.handling.Context;
import ratpack.core.http.Response;
import ratpack.core.http.internal.HttpHeaderConstants;
import ratpack.core.render.Renderable;
import ratpack.core.sse.ServerSentEvent;
import ratpack.core.sse.ServerSentEventsBuilder;
import ratpack.core.sse.internal.Clock;
import ratpack.core.sse.internal.ServerSentEventEncoder;
import ratpack.core.sse.internal.ServerSentEventStreamBuffer;
import ratpack.core.sse.internal.ServerSentEventStreamBufferSettings;
import ratpack.core.sse.internal.ServerSentEventStreamKeepAlive;
import ratpack.exec.stream.Streams;
import ratpack.exec.stream.TransformablePublisher;
import ratpack.func.Nullable;

public class ServerSentEvents
implements Renderable {
    private final Publisher<? extends ServerSentEvent> publisher;
    private final boolean noContentOnEmpty;
    @Nullable
    private final Duration heartbeatFrequency;
    @Nullable
    private final ServerSentEventStreamBufferSettings bufferSettings;

    public static ServerSentEventsBuilder builder() {
        return new BuilderImpl();
    }

    private ServerSentEvents(Publisher<? extends ServerSentEvent> publisher, boolean noContentOnEmpty, @Nullable Duration heartbeatFrequency, @Nullable ServerSentEventStreamBufferSettings bufferSettings) {
        this.publisher = publisher;
        this.noContentOnEmpty = noContentOnEmpty;
        this.heartbeatFrequency = heartbeatFrequency;
        this.bufferSettings = bufferSettings;
    }

    @Override
    public void render(Context context) throws Exception {
        Response response = context.getResponse();
        response.getHeaders().add(HttpHeaderConstants.CACHE_CONTROL, HttpHeaderConstants.NO_CACHE_FULL);
        response.getHeaders().add(HttpHeaderConstants.PRAGMA, HttpHeaderConstants.NO_CACHE);
        if (this.noContentOnEmpty) {
            this.renderWithNoContentOnEmpty(context);
        } else {
            this.renderStream(context, this.publisher);
        }
    }

    private void renderWithNoContentOnEmpty(final Context context) {
        this.publisher.subscribe((Subscriber)new Subscriber<ServerSentEvent>(){
            private Subscription subscription;
            private Subscriber<? super ServerSentEvent> subscriber;

            public void onSubscribe(Subscription s) {
                this.subscription = s;
                this.subscription.request(1L);
            }

            public void onNext(ServerSentEvent event) {
                if (this.subscriber == null) {
                    TransformablePublisher consumedPublisher = Streams.publish(Collections.singleton(event));
                    Publisher restPublisher = s -> {
                        this.subscriber = s;
                        s.onSubscribe(Objects.requireNonNull(this.subscription));
                    };
                    ServerSentEvents.this.renderStream(context, (Publisher<? extends ServerSentEvent>)((Publisher)Streams.concat(Arrays.asList(consumedPublisher, restPublisher))));
                } else {
                    this.subscriber.onNext((Object)event);
                }
            }

            public void onError(Throwable t) {
                if (this.subscriber == null) {
                    context.error(t);
                } else {
                    this.subscriber.onError(t);
                }
            }

            public void onComplete() {
                if (this.subscriber == null) {
                    ServerSentEvents.emptyStream(context);
                } else {
                    this.subscriber.onComplete();
                }
            }
        });
    }

    private void renderStream(Context context, Publisher<? extends ServerSentEvent> events) {
        Response response = context.getResponse();
        response.getHeaders().add(HttpHeaderConstants.CONTENT_TYPE, HttpHeaderConstants.TEXT_EVENT_STREAM_CHARSET_UTF_8);
        response.getHeaders().add(HttpHeaderConstants.TRANSFER_ENCODING, HttpHeaderConstants.CHUNKED);
        ByteBufAllocator byteBufAllocator = context.getDirectChannelAccess().getChannel().alloc();
        Object buffers = Streams.map(events, i -> ServerSentEventEncoder.INSTANCE.encode((ServerSentEvent)i, byteBufAllocator));
        EventLoop executor = context.getDirectChannelAccess().getChannel().eventLoop();
        Clock clock = System::nanoTime;
        if (this.bufferSettings != null) {
            buffers = new ServerSentEventStreamBuffer((Publisher<? extends ByteBuf>)buffers, (ScheduledExecutorService)executor, byteBufAllocator, this.bufferSettings, clock);
        }
        if (this.heartbeatFrequency != null) {
            buffers = new ServerSentEventStreamKeepAlive((Publisher<? extends ByteBuf>)buffers, (ScheduledExecutorService)executor, this.heartbeatFrequency, clock);
        }
        response.sendStream((Publisher<? extends ByteBuf>)buffers);
    }

    private static void emptyStream(Context ctx) {
        ctx.getResponse().status(HttpResponseStatus.NO_CONTENT.code()).send();
    }

    private static class BuilderImpl
    implements ServerSentEventsBuilder {
        private boolean noContentOnEmpty;
        private ServerSentEventStreamBufferSettings bufferSettings;
        private Duration keepAliveHeartbeat;

        private BuilderImpl() {
        }

        @Override
        public ServerSentEventsBuilder buffered(int numEvents, int numBytes, Duration duration) {
            if (numEvents < 1) {
                System.out.println("numEvents must be > 0");
            }
            if (numBytes < 1) {
                System.out.println("numBytes must be > 0");
            }
            if (duration.isNegative()) {
                throw new IllegalArgumentException("duration must be zero or positive");
            }
            this.bufferSettings = new ServerSentEventStreamBufferSettings(numEvents, numBytes, duration);
            return this;
        }

        @Override
        public ServerSentEventsBuilder noContentOnEmpty() {
            this.noContentOnEmpty = true;
            return this;
        }

        @Override
        public ServerSentEventsBuilder keepAlive(Duration heartbeatAfterIdleFor) {
            if (heartbeatAfterIdleFor.isNegative() || heartbeatAfterIdleFor.isZero()) {
                throw new IllegalArgumentException("duration must be positive");
            }
            this.keepAliveHeartbeat = heartbeatAfterIdleFor;
            return this;
        }

        @Override
        public ServerSentEvents build(Publisher<? extends ServerSentEvent> events) {
            return new ServerSentEvents(events, this.noContentOnEmpty, this.keepAliveHeartbeat, this.bufferSettings);
        }
    }
}

