/*
 * Decompiled with CFR 0.152.
 */
package com.azure.ai.agents.persistent.models;

import com.azure.ai.agents.persistent.models.PersistentAgentStreamEvent;
import com.azure.ai.agents.persistent.models.StreamTypeFactory;
import com.azure.ai.agents.persistent.models.StreamUpdate;
import com.azure.core.util.BinaryData;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public final class PersistentAgentServerSentEvents {
    private static final int SSE_CHUNK_LINE_BREAK_COUNT_MARKER = 2;
    private final ClientLogger logger = new ClientLogger(PersistentAgentServerSentEvents.class);
    private final StreamTypeFactory eventDeserializer = new StreamTypeFactory();
    private final Flux<ByteBuffer> source;
    private ByteArrayOutputStream outStream;

    public PersistentAgentServerSentEvents(Flux<ByteBuffer> source) {
        this.source = source;
        this.outStream = new ByteArrayOutputStream();
    }

    public Flux<StreamUpdate> getEvents() {
        return this.mapEventStream();
    }

    private Flux<StreamUpdate> mapEventStream() {
        return this.source.publishOn(Schedulers.boundedElastic()).concatMap(byteBuffer -> {
            ArrayList<StreamUpdate> values = new ArrayList<StreamUpdate>();
            byte[] byteArray = FluxUtil.byteBufferToArray((ByteBuffer)byteBuffer);
            byte[] outByteArray = this.outStream.toByteArray();
            int lineBreakCharsEncountered = outByteArray.length > 0 && this.isByteLineFeed(outByteArray[outByteArray.length - 1]) ? 1 : 0;
            int startIndex = 0;
            for (int i = 0; i < byteArray.length; ++i) {
                byte currentByte = byteArray[i];
                if (this.isByteLineFeed(currentByte)) {
                    if (++lineBreakCharsEncountered != 2) continue;
                    this.outStream.write(byteArray, startIndex, i - startIndex + 1);
                    try {
                        String currentLine = this.outStream.toString(StandardCharsets.UTF_8.name());
                        this.handleCurrentEvent(currentLine, values);
                    }
                    catch (IOException e) {
                        return Flux.error((Throwable)e);
                    }
                    this.outStream = new ByteArrayOutputStream();
                    startIndex = i + 1;
                    continue;
                }
                if (this.isByteCarriageReturn(currentByte)) continue;
                lineBreakCharsEncountered = 0;
            }
            if (startIndex < byteArray.length) {
                this.outStream.write(byteArray, startIndex, byteArray.length - startIndex);
            }
            try {
                String remainingBytes = this.outStream.toString(StandardCharsets.UTF_8.name());
                if (remainingBytes.endsWith("\n\n") || remainingBytes.endsWith("\r\n\r\n")) {
                    this.handleCurrentEvent(remainingBytes, values);
                }
            }
            catch (UncheckedIOException | IllegalArgumentException e) {
                return Flux.fromIterable(values);
            }
            catch (IOException e) {
                return Flux.error((Throwable)e);
            }
            return Flux.fromIterable(values);
        }).cache();
    }

    private boolean isByteLineFeed(byte character) {
        return character == 10;
    }

    private boolean isByteCarriageReturn(byte character) {
        return character == 13;
    }

    public void handleCurrentEvent(String currentEvent, List<StreamUpdate> outputValues) throws IllegalArgumentException {
        if (currentEvent.isEmpty()) {
            return;
        }
        String[] lines = currentEvent.split("\n", 2);
        if (lines.length != 2) {
            return;
        }
        if (lines[0].isEmpty() || lines[1].isEmpty()) {
            return;
        }
        String[] eventParts = lines[0].split(":", 2);
        String[] dataParts = lines[1].split(":", 2);
        if (eventParts.length != 2 || !eventParts[0].trim().equals("event")) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("Invalid event format: missing event name"));
        }
        String eventName = eventParts[1].trim();
        if (dataParts.length != 2 || !dataParts[0].trim().equals("data")) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("Invalid event format: missing event data"));
        }
        String eventJson = dataParts[1].trim();
        if (PersistentAgentStreamEvent.DONE.equals((Object)PersistentAgentStreamEvent.fromString(eventName))) {
            return;
        }
        if (PersistentAgentStreamEvent.ERROR.equals((Object)PersistentAgentStreamEvent.fromString(eventName))) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException(eventJson));
        }
        outputValues.add(this.eventDeserializer.deserializeEvent(eventName, BinaryData.fromString((String)eventJson)));
    }
}

