/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.web.sse;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.noear.solon.core.handle.Context;
import org.noear.solon.core.handle.ContextAsyncListener;
import org.noear.solon.web.sse.AsyncListenerImpl;
import org.noear.solon.web.sse.SseEmitter;
import org.noear.solon.web.sse.SseEvent;

public class SseEmitterHandler {
    private final SseEmitter emitter;
    private final Context ctx;
    private final LinkedBlockingQueue<String> q = new LinkedBlockingQueue();
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    public SseEmitterHandler(SseEmitter emitter) {
        this.ctx = Context.current();
        this.emitter = emitter;
    }

    public void start() throws Throwable {
        this.ctx.contentType("text/event-stream;charset=utf-8");
        if (this.ctx.asyncSupported()) {
            this.ctx.asyncStart(this.emitter.timeout, (ContextAsyncListener)new AsyncListenerImpl(this));
            this.emitter.initialize(this);
        } else {
            this.emitter.initialize(this);
            this.executeSyncSendTask();
        }
    }

    private void executeSyncSendTask() throws Throwable {
        while (!this.stopped.get()) {
            String event = this.q.poll();
            if (event != null) {
                this.ctx.output(event);
                this.ctx.flush();
            }
            Thread.sleep(500L);
        }
    }

    public synchronized void send(SseEvent event) throws IOException {
        block7: {
            if (event == null) {
                throw new IllegalArgumentException("SSE event cannot be null");
            }
            if (this.stopped.get()) {
                throw new IllegalStateException("SSE emitter was not initialized or stopped");
            }
            try {
                if (this.ctx.asyncSupported()) {
                    this.ctx.output(event.build());
                    this.ctx.flush();
                    break block7;
                }
                try {
                    this.q.put(event.build());
                }
                catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
            catch (IOException e) {
                this.stopOnError(e);
                throw e;
            }
        }
    }

    public void complete() {
        this.stop();
    }

    protected void stopOnError(Throwable e) {
        if (this.emitter.onError != null) {
            this.emitter.onError.accept(e);
        }
        this.stop();
    }

    protected void stopOnTimeout() {
        if (this.emitter.onTimeout != null) {
            this.emitter.onTimeout.run();
        }
        this.stop();
    }

    protected void stop() {
        if (!this.stopped.get()) {
            this.stopped.set(true);
            if (this.emitter.onCompletion != null) {
                this.emitter.onCompletion.run();
            }
            if (this.ctx.asyncSupported()) {
                this.ctx.asyncComplete();
            }
        }
    }
}

