/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.web.sse;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.noear.solon.core.Lifecycle;
import org.noear.solon.core.event.EventBus;
import org.noear.solon.core.handle.Context;
import org.noear.solon.web.sse.SseEvent;

public class SseEmitter
implements Lifecycle {
    private final Context ctx;
    private final LinkedBlockingQueue<String> q = new LinkedBlockingQueue();
    private final AtomicBoolean b = new AtomicBoolean(false);
    private Runnable onCompletion;
    private Consumer<Throwable> onError;
    private final long keepAlive;

    public SseEmitter onCompletion(Runnable onCompletion) {
        this.onCompletion = onCompletion;
        return this;
    }

    public SseEmitter onError(Consumer<Throwable> onError) {
        this.onError = onError;
        return this;
    }

    public SseEmitter(long keepAlive) {
        this.ctx = Context.current();
        this.keepAlive = keepAlive > 0L ? keepAlive : 60L;
        this.internalSendHeader();
    }

    public void send(String data) throws IOException {
        this.send(new SseEvent().data(data));
    }

    public void send(SseEvent event) throws IOException {
        if (event == null) {
            return;
        }
        if (this.b.get()) {
            return;
        }
        if (this.ctx.asyncSupported()) {
            this.internalSendEvent(event.build());
        } else {
            try {
                this.q.put(event.build());
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void executeSendTask() throws Throwable {
        while (!this.b.get()) {
            String event = this.q.poll();
            if (event != null) {
                this.internalSendEvent(event);
            }
            Thread.sleep(1000L);
        }
    }

    private void internalSendHeader() {
        this.ctx.headerSet("Content-Type", "text/event-stream");
        this.ctx.headerSet("Cache-Control", "no-cache");
        this.ctx.headerSet("Connection", "keep-alive");
        this.ctx.headerSet("Keep-Alive", "timeout=" + this.keepAlive);
    }

    private void internalSendEvent(String event) throws IOException {
        this.ctx.output(event);
        this.ctx.flush();
    }

    public void start() throws Throwable {
        try {
            if (this.ctx.asyncSupported()) {
                this.ctx.asyncStart();
            } else {
                this.executeSendTask();
            }
        }
        catch (Throwable e) {
            if (this.onError != null) {
                this.onError.accept(e);
            }
            EventBus.pushTry((Object)e);
        }
    }

    public void stop() throws Throwable {
        if (this.onCompletion != null) {
            this.onCompletion.run();
        }
        this.b.set(true);
        this.ctx.asyncComplete();
    }
}

