/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.awscore.eventstream;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.awscore.eventstream.EventStreamResponseHandler;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.http.HttpResponse;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.internal.util.ThrowableUtils;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.FunctionalUtils;
import software.amazon.eventstream.HeaderValue;
import software.amazon.eventstream.Message;
import software.amazon.eventstream.MessageDecoder;

@SdkProtectedApi
public class EventStreamAsyncResponseTransformer<ResponseT, EventT>
implements AsyncResponseTransformer<SdkResponse, Void> {
    private static final Logger log = LoggerFactory.getLogger(EventStreamAsyncResponseTransformer.class);
    private static final ExecutionAttributes EMPTY_EXECUTION_ATTRIBUTES = new ExecutionAttributes();
    private final EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseTransformer;
    private final HttpResponseHandler<? extends ResponseT> initialResponseUnmarshaller;
    private final HttpResponseHandler<? extends EventT> eventUnmarshaller;
    private final HttpResponseHandler<? extends Throwable> exceptionUnmarshaller;
    private final AtomicLong remainingDemand = new AtomicLong(0L);
    private final AtomicReference<Subscriber<? super EventT>> subscriberRef = new AtomicReference();
    private final MessageDecoder decoder = this.createDecoder();
    private volatile boolean isDone = false;
    private final AtomicReference<Throwable> error = new AtomicReference();

    public EventStreamAsyncResponseTransformer(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseTransformer, HttpResponseHandler<? extends ResponseT> initialResponseUnmarshaller, HttpResponseHandler<? extends EventT> eventUnmarshaller, HttpResponseHandler<? extends Throwable> exceptionUnmarshaller) {
        this.eventStreamResponseTransformer = eventStreamResponseTransformer;
        this.initialResponseUnmarshaller = initialResponseUnmarshaller;
        this.eventUnmarshaller = eventUnmarshaller;
        this.exceptionUnmarshaller = exceptionUnmarshaller;
    }

    public void responseReceived(SdkResponse response) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onStream(SdkPublisher<ByteBuffer> publisher) {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            this.isDone = false;
        }
        CompletableFuture dataSubscriptionFuture = new CompletableFuture();
        publisher.subscribe((Subscriber)new ByteSubscriber(dataSubscriptionFuture));
        dataSubscriptionFuture.thenAccept(dataSubscription -> {
            EventPublisher eventPublisher = new EventPublisher((Subscription)dataSubscription);
            try {
                this.eventStreamResponseTransformer.onEventStream(eventPublisher);
            }
            catch (Throwable t) {
                dataSubscription.cancel();
                this.exceptionOccurred(t);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exceptionOccurred(Throwable throwable) {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            if (!this.isDone) {
                this.isDone = true;
                this.error.set(throwable);
                if (this.subscriberRef.get() != null) {
                    FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from Subscriber#onError, ignoring.", () -> this.subscriberRef.get().onError(throwable));
                }
                this.eventStreamResponseTransformer.exceptionOccurred(throwable);
            }
        }
    }

    public Void complete() {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            if (!this.isDone) {
                this.isDone = true;
                if (this.subscriberRef.get() != null) {
                    FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from Subscriber#onComplete, ignoring.", () -> this.subscriberRef.get().onComplete());
                }
                this.eventStreamResponseTransformer.complete();
                return null;
            }
            throw ThrowableUtils.failure((Throwable)this.error.get());
        }
    }

    private MessageDecoder createDecoder() {
        return new MessageDecoder(m -> {
            try {
                if (this.isEvent((Message)m)) {
                    if (((HeaderValue)m.getHeaders().get(":event-type")).getString().equals("initial-response")) {
                        this.eventStreamResponseTransformer.responseReceived(this.initialResponseUnmarshaller.handle(this.adaptMessageToResponse((Message)m), EMPTY_EXECUTION_ATTRIBUTES));
                    } else {
                        this.remainingDemand.decrementAndGet();
                        this.subscriberRef.get().onNext(this.eventUnmarshaller.handle(this.adaptMessageToResponse((Message)m), EMPTY_EXECUTION_ATTRIBUTES));
                    }
                } else if (this.isError((Message)m) || this.isException((Message)m)) {
                    Throwable exception = (Throwable)this.exceptionUnmarshaller.handle(this.adaptMessageToResponse((Message)m), EMPTY_EXECUTION_ATTRIBUTES);
                    FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from exceptionOccurred, ignoring.", () -> this.exceptionOccurred(exception));
                }
            }
            catch (Exception e) {
                throw SdkClientException.builder().cause((Throwable)e).build();
            }
        });
    }

    private boolean isEvent(Message m) {
        return "event".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private boolean isError(Message m) {
        return "error".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private boolean isException(Message m) {
        return "exception".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private HttpResponse adaptMessageToResponse(Message m) {
        HttpResponse response = new HttpResponse(null);
        response.setContent((InputStream)new ByteArrayInputStream(m.getPayload()));
        m.getHeaders().forEach((k, v) -> response.addHeader(k, v.getString()));
        return response;
    }

    private class EventPublisher
    implements SdkPublisher<EventT> {
        private final Subscription dataSubscription;

        private EventPublisher(Subscription dataSubscription) {
            this.dataSubscription = dataSubscription;
        }

        public void subscribe(Subscriber<? super EventT> subscriber) {
            if (!EventStreamAsyncResponseTransformer.this.subscriberRef.compareAndSet(null, subscriber)) {
                log.error("Event stream publishers can only be subscribed to once.");
                throw new IllegalStateException("This publisher may only be subscribed to once");
            }
            subscriber.onSubscribe(new Subscription(){

                public void request(long l) {
                    EventPublisher.this.dataSubscription.request(1L);
                    EventStreamAsyncResponseTransformer.this.remainingDemand.addAndGet(l);
                }

                public void cancel() {
                    EventPublisher.this.dataSubscription.cancel();
                }
            });
        }
    }

    private class ByteSubscriber
    implements Subscriber<ByteBuffer> {
        private final CompletableFuture<Subscription> dataSubscriptionFuture;
        private Subscription subscription;

        private ByteSubscriber(CompletableFuture<Subscription> dataSubscriptionFuture) {
            this.dataSubscriptionFuture = dataSubscriptionFuture;
        }

        public void onSubscribe(Subscription subscription) {
            this.dataSubscriptionFuture.complete(subscription);
            this.subscription = subscription;
        }

        public void onNext(ByteBuffer buffer) {
            EventStreamAsyncResponseTransformer.this.decoder.feed(BinaryUtils.copyBytesFrom((ByteBuffer)buffer));
            if (EventStreamAsyncResponseTransformer.this.remainingDemand.get() > 0L) {
                this.subscription.request(1L);
            }
        }

        public void onError(Throwable throwable) {
        }

        public void onComplete() {
        }
    }
}

