/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.net.http.textstream;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.HashMap;
import org.noear.solon.Solon;
import org.noear.solon.core.util.RunUtil;
import org.noear.solon.net.http.HttpResponse;
import org.noear.solon.net.http.textstream.CloseTrackableBufferedReader;
import org.noear.solon.net.http.textstream.ServerSentEvent;
import org.noear.solon.rx.SimpleSubscription;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class TextStreamUtil {
    @Deprecated
    public static void parseTextStream(InputStream inputStream, Subscriber<? super String> subscriber) throws IOException {
        TextStreamUtil.parseLineStream(inputStream, null, subscriber);
    }

    public static Publisher<String> parseLineStream(InputStream inputStream) {
        return subscriber -> {
            try {
                TextStreamUtil.parseLineStream(inputStream, null, (Subscriber<? super String>)subscriber);
            }
            catch (Exception e) {
                subscriber.onError((Throwable)e);
            }
        };
    }

    public static Publisher<String> parseLineStream(HttpResponse response) {
        return subscriber -> {
            try {
                TextStreamUtil.parseLineStream(response.body(), response.contentEncoding(), (Subscriber<? super String>)subscriber);
            }
            catch (Exception e) {
                subscriber.onError((Throwable)e);
            }
        };
    }

    public static void parseLineStream(HttpResponse response, Subscriber<? super String> subscriber) {
        TextStreamUtil.parseLineStream(response.body(), response.contentEncoding(), subscriber);
    }

    public static void parseLineStream(InputStream inputStream, Charset charset, Subscriber<? super String> subscriber) {
        if (charset == null) {
            charset = Charset.forName(Solon.encoding());
        }
        CloseTrackableBufferedReader reader = new CloseTrackableBufferedReader(new InputStreamReader(inputStream, charset), 1024);
        subscriber.onSubscribe((Subscription)new SimpleSubscription().onRequest((subscription, l) -> TextStreamUtil.onLineStreamRequestDo(reader, subscriber, subscription, l)));
    }

    private static void onLineStreamRequestDo(CloseTrackableBufferedReader reader, Subscriber<? super String> subscriber, SimpleSubscription subscription, long l) {
        try {
            while (l > 0L) {
                if (subscription.isCancelled()) {
                    RunUtil.runAndTry(reader::close);
                    return;
                }
                if (reader.isClosed()) {
                    subscriber.onComplete();
                    return;
                }
                String textLine = reader.readLine();
                if (textLine == null) {
                    RunUtil.runAndTry(reader::close);
                    subscriber.onComplete();
                    return;
                }
                subscriber.onNext((Object)textLine);
                --l;
            }
        }
        catch (Throwable err) {
            RunUtil.runAndTry(reader::close);
            subscriber.onError(err);
        }
    }

    @Deprecated
    public static void parseEventStream(InputStream inputStream, Subscriber<? super ServerSentEvent> subscriber) throws IOException {
        TextStreamUtil.parseSseStream(inputStream, null, subscriber);
    }

    public static Publisher<ServerSentEvent> parseSseStream(InputStream inputStream) {
        return subscriber -> {
            try {
                TextStreamUtil.parseSseStream(inputStream, null, (Subscriber<? super ServerSentEvent>)subscriber);
            }
            catch (Exception e) {
                subscriber.onError((Throwable)e);
            }
        };
    }

    public static Publisher<ServerSentEvent> parseSseStream(HttpResponse response) {
        return subscriber -> {
            try {
                TextStreamUtil.parseSseStream(response.body(), response.contentEncoding(), (Subscriber<? super ServerSentEvent>)subscriber);
            }
            catch (Exception e) {
                subscriber.onError((Throwable)e);
            }
        };
    }

    public static void parseSseStream(HttpResponse response, Subscriber<? super ServerSentEvent> subscriber) {
        TextStreamUtil.parseSseStream(response.body(), response.contentEncoding(), subscriber);
    }

    public static void parseSseStream(InputStream inputStream, Charset charset, Subscriber<? super ServerSentEvent> subscriber) {
        if (charset == null) {
            charset = Charset.forName(Solon.encoding());
        }
        CloseTrackableBufferedReader reader = new CloseTrackableBufferedReader(new InputStreamReader(inputStream, charset), 1024);
        subscriber.onSubscribe((Subscription)new SimpleSubscription().onRequest((subscription, l) -> TextStreamUtil.onSseStreamRequestDo(reader, subscriber, subscription, l)));
    }

    private static void onSseStreamRequestDo(CloseTrackableBufferedReader reader, Subscriber<? super ServerSentEvent> subscriber, SimpleSubscription subscription, long l) {
        try {
            HashMap<String, String> meta = new HashMap<String, String>();
            StringBuilder data = new StringBuilder();
            while (l > 0L) {
                if (subscription.isCancelled()) {
                    RunUtil.runAndTry(reader::close);
                    return;
                }
                if (reader.isClosed()) {
                    subscriber.onComplete();
                    return;
                }
                String textLine = reader.readLine();
                if (textLine == null) {
                    RunUtil.runAndTry(reader::close);
                    subscriber.onComplete();
                    return;
                }
                if (textLine.isEmpty()) {
                    if (data.length() <= 0) continue;
                    String dataStr = data.toString();
                    subscriber.onNext((Object)new ServerSentEvent(meta, dataStr));
                    --l;
                    meta.clear();
                    data.setLength(0);
                    if (dataStr.length() != 6 || !dataStr.equals("[DONE]")) continue;
                    RunUtil.runAndTry(reader::close);
                    subscriber.onComplete();
                    return;
                }
                if (textLine.startsWith("data:")) {
                    String content = textLine.substring("data:".length());
                    if (data.length() > 0) {
                        data.append("\n");
                    }
                    data.append(content.trim());
                    continue;
                }
                int flagIdx = textLine.indexOf(58);
                if (flagIdx <= 0) continue;
                meta.put(textLine.substring(0, flagIdx).trim(), textLine.substring(flagIdx + 1).trim());
            }
        }
        catch (Throwable err) {
            RunUtil.runAndTry(reader::close);
            subscriber.onError(err);
        }
    }
}

