/*
 * Decompiled with CFR 0.152.
 */
package com.luna.common.net.async;

import com.luna.common.net.hander.AbstactEventFutureCallback;
import com.luna.common.net.hander.LoggingEventSourceListener;
import com.luna.common.net.sse.Event;
import com.luna.common.net.sse.SseResponse;
import java.io.IOException;
import java.nio.CharBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.hc.client5.http.async.methods.AbstractCharResponseConsumer;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomSseAsyncConsumer
extends AbstractCharResponseConsumer<SseResponse> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CustomSseAsyncConsumer.class);
    private final BlockingQueue<Event> events;
    private final AbstactEventFutureCallback<SseResponse, Event> callback;
    private final Integer timeWait;
    private SseResponse response;

    public CustomSseAsyncConsumer(AbstactEventFutureCallback<SseResponse, Event> callback) {
        this(100, callback, 1000);
    }

    public CustomSseAsyncConsumer(Integer capacity, AbstactEventFutureCallback<SseResponse, Event> callback, Integer timeWait) {
        this.events = new ArrayBlockingQueue<Event>(capacity);
        this.callback = callback;
        this.timeWait = timeWait;
    }

    public CustomSseAsyncConsumer() {
        this(100, new LoggingEventSourceListener<SseResponse, Event>(), 1000);
    }

    public CustomSseAsyncConsumer(final FutureCallback<Event> listener) {
        this(100, new AbstactEventFutureCallback<SseResponse, Event>(){

            @Override
            public void onEvent(Event result) {
                listener.completed((Object)result);
            }
        }, 1000);
    }

    protected void start(HttpResponse response, ContentType contentType) throws HttpException, IOException {
        this.response = new SseResponse(response, contentType);
        this.response.getEntity().setEvents(this.events);
    }

    protected int capacityIncrement() {
        return Integer.MAX_VALUE;
    }

    protected void data(CharBuffer data, boolean endOfStream) throws IOException {
        this.response.getEntity().pushBuffer(data, endOfStream);
        try {
            Event poll = this.events.poll(this.timeWait.intValue(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                log.info("data::data = {}, endOfStream = {}", (Object)poll, (Object)endOfStream);
                this.callback.onEvent(poll);
            }
        }
        catch (InterruptedException e) {
            log.error("data::data = {}, endOfStream = {} ", new Object[]{data, endOfStream, e});
        }
    }

    protected SseResponse buildResult() {
        return this.response;
    }

    public void releaseResources() {
        this.callback.completed(this.response);
    }

    public void failed(Exception cause) {
        this.callback.failed(cause);
    }
}

