/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.awscore.eventstream;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.ReviewBeforeRelease;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.awscore.eventstream.EventStreamResponseHandler;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.http.HttpResponse;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.core.internal.util.ThrowableUtils;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.FunctionalUtils;
import software.amazon.eventstream.HeaderValue;
import software.amazon.eventstream.Message;
import software.amazon.eventstream.MessageDecoder;

@SdkProtectedApi
public class EventStreamAsyncResponseTransformer<ResponseT, EventT>
implements AsyncResponseTransformer<SdkResponse, Void> {
    private static final Logger log = LoggerFactory.getLogger(EventStreamAsyncResponseTransformer.class);
    private static final Object ON_COMPLETE_EVENT = new Object();
    private static final ExecutionAttributes EMPTY_EXECUTION_ATTRIBUTES = new ExecutionAttributes();
    private final EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler;
    private final HttpResponseHandler<? extends ResponseT> initialResponseHandler;
    private final HttpResponseHandler<? extends EventT> eventResponseHandler;
    private final HttpResponseHandler<? extends Throwable> exceptionResponseHandler;
    private final AtomicLong remainingDemand = new AtomicLong(0L);
    private final AtomicReference<Subscriber<? super EventT>> subscriberRef = new AtomicReference();
    private final AtomicReference<Subscription> dataSubscription = new AtomicReference();
    private final MessageDecoder decoder = new MessageDecoder(this::handleMessage);
    private volatile boolean isDone = false;
    private final AtomicReference<Throwable> error = new AtomicReference();
    private final Executor executor;
    private final Queue<Object> eventsToDeliver = new LinkedList<Object>();
    private final AtomicBoolean isDelivering = new AtomicBoolean(false);
    private final AtomicBoolean isRequesting = new AtomicBoolean(false);
    private final CompletableFuture<Void> future;
    private final String serviceName;
    private String requestId = null;
    private String extendedRequestId = null;

    @Deprecated
    @ReviewBeforeRelease(value="Remove this on full GA of 2.0.0")
    public EventStreamAsyncResponseTransformer(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler, HttpResponseHandler<? extends ResponseT> initialResponseHandler, HttpResponseHandler<? extends EventT> eventResponseHandler, HttpResponseHandler<? extends Throwable> exceptionResponseHandler) {
        this(eventStreamResponseHandler, initialResponseHandler, eventResponseHandler, exceptionResponseHandler, Executors.newSingleThreadScheduledExecutor(), new CompletableFuture<Void>(), "");
    }

    private EventStreamAsyncResponseTransformer(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler, HttpResponseHandler<? extends ResponseT> initialResponseHandler, HttpResponseHandler<? extends EventT> eventResponseHandler, HttpResponseHandler<? extends Throwable> exceptionResponseHandler, Executor executor, CompletableFuture<Void> future, String serviceName) {
        this.eventStreamResponseHandler = eventStreamResponseHandler;
        this.initialResponseHandler = initialResponseHandler;
        this.eventResponseHandler = eventResponseHandler;
        this.exceptionResponseHandler = exceptionResponseHandler;
        this.executor = executor;
        this.future = future;
        this.serviceName = serviceName;
    }

    public void responseReceived(SdkResponse response) {
        if (response != null && response.sdkHttpResponse() != null) {
            this.requestId = response.sdkHttpResponse().firstMatchingHeader("x-amzn-RequestId").orElse(null);
            this.extendedRequestId = response.sdkHttpResponse().firstMatchingHeader("x-amz-id-2").orElse(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onStream(SdkPublisher<ByteBuffer> publisher) {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            this.isDone = false;
        }
        CompletableFuture dataSubscriptionFuture = new CompletableFuture();
        publisher.subscribe((Subscriber)new ByteSubscriber(dataSubscriptionFuture));
        dataSubscriptionFuture.thenAccept(dataSubscription -> {
            EventPublisher eventPublisher = new EventPublisher((Subscription)dataSubscription);
            try {
                this.eventStreamResponseHandler.onEventStream(eventPublisher);
            }
            catch (Throwable t) {
                this.exceptionOccurred(t);
                dataSubscription.cancel();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exceptionOccurred(Throwable throwable) {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            if (!this.isDone) {
                this.isDone = true;
                this.error.set(throwable);
                if (this.subscriberRef.get() != null) {
                    FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from Subscriber#onError, ignoring.", () -> this.subscriberRef.get().onError(throwable));
                }
                this.eventStreamResponseHandler.exceptionOccurred(throwable);
            }
        }
    }

    public Void complete() {
        if (this.error.get() == null) {
            this.eventsToDeliver.add(ON_COMPLETE_EVENT);
            this.drainEventsIfNotAlready();
            return null;
        }
        throw ThrowableUtils.failure((Throwable)this.error.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onEventComplete() {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            this.isDone = true;
            FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from Subscriber#onComplete, ignoring.", () -> this.subscriberRef.get().onComplete());
            this.eventStreamResponseHandler.complete();
            this.future.complete(null);
        }
    }

    private void handleMessage(Message m) {
        try {
            if (this.isEvent(m)) {
                if (((HeaderValue)m.getHeaders().get(":event-type")).getString().equals("initial-response")) {
                    this.eventStreamResponseHandler.responseReceived(this.initialResponseHandler.handle(this.adaptMessageToResponse(m, false), EMPTY_EXECUTION_ATTRIBUTES));
                } else {
                    this.eventsToDeliver.add(this.eventResponseHandler.handle(this.adaptMessageToResponse(m, false), EMPTY_EXECUTION_ATTRIBUTES));
                }
            } else if (this.isError(m) || this.isException(m)) {
                HttpResponse errorResponse = this.adaptMessageToResponse(m, true);
                Throwable exception = (Throwable)this.exceptionResponseHandler.handle(errorResponse, new ExecutionAttributes().putAttribute(SdkExecutionAttribute.SERVICE_NAME, (Object)this.serviceName));
                FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from exceptionOccurred, ignoring.", () -> this.exceptionOccurred(exception));
            }
        }
        catch (Exception e) {
            throw SdkClientException.builder().cause((Throwable)e).build();
        }
    }

    private boolean isEvent(Message m) {
        return "event".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private boolean isError(Message m) {
        return "error".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private boolean isException(Message m) {
        return "exception".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private HttpResponse adaptMessageToResponse(Message m, boolean isException) {
        HttpResponse response = new HttpResponse(null);
        response.setContent((InputStream)new ByteArrayInputStream(m.getPayload()));
        m.getHeaders().forEach((k, v) -> response.addHeader(k, v.getString()));
        if (this.requestId != null) {
            response.addHeader("x-amzn-RequestId", this.requestId);
        }
        if (this.extendedRequestId != null) {
            response.addHeader("x-amz-id-2", this.extendedRequestId);
        }
        response.setStatusCode(isException ? 500 : 200);
        return response;
    }

    private void requestDataIfNotAlready() {
        if (this.isRequesting.compareAndSet(false, true)) {
            this.dataSubscription.get().request(1L);
        }
    }

    private void drainEventsIfNotAlready() {
        if (this.isDelivering.compareAndSet(false, true)) {
            this.drainEvents();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drainEvents() {
        if (this.isDone) {
            return;
        }
        Queue<Object> queue = this.eventsToDeliver;
        synchronized (queue) {
            if (this.eventsToDeliver.peek() == ON_COMPLETE_EVENT) {
                this.onEventComplete();
                return;
            }
            if (this.eventsToDeliver.isEmpty() || this.remainingDemand.get() == 0L) {
                this.isDelivering.compareAndSet(true, false);
                if (this.remainingDemand.get() > 0L) {
                    this.requestDataIfNotAlready();
                }
            } else {
                Object event = this.eventsToDeliver.remove();
                this.remainingDemand.decrementAndGet();
                CompletableFuture.runAsync(() -> this.deliverEvent(event), this.executor).thenRunAsync(this::drainEvents, this.executor);
            }
        }
    }

    private void deliverEvent(Object event) {
        this.subscriberRef.get().onNext(event);
    }

    public static <ResponseT, EventT> Builder<ResponseT, EventT> builder() {
        return new Builder();
    }

    public static final class Builder<ResponseT, EventT> {
        private EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler;
        private HttpResponseHandler<? extends ResponseT> initialResponseHandler;
        private HttpResponseHandler<? extends EventT> eventResponseHandler;
        private HttpResponseHandler<? extends Throwable> exceptionResponseHandler;
        private Executor executor;
        private CompletableFuture<Void> future;
        private String serviceName;

        private Builder() {
        }

        public Builder<ResponseT, EventT> eventStreamResponseHandler(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler) {
            this.eventStreamResponseHandler = eventStreamResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> initialResponseHandler(HttpResponseHandler<? extends ResponseT> initialResponseHandler) {
            this.initialResponseHandler = initialResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> eventResponseHandler(HttpResponseHandler<? extends EventT> eventResponseHandler) {
            this.eventResponseHandler = eventResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> exceptionResponseHandler(HttpResponseHandler<? extends Throwable> exceptionResponseHandler) {
            this.exceptionResponseHandler = exceptionResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> executor(Executor executor) {
            this.executor = executor;
            return this;
        }

        public Builder<ResponseT, EventT> future(CompletableFuture<Void> future) {
            this.future = future;
            return this;
        }

        public Builder<ResponseT, EventT> serviceName(String serviceName) {
            this.serviceName = serviceName;
            return this;
        }

        public EventStreamAsyncResponseTransformer<ResponseT, EventT> build() {
            return new EventStreamAsyncResponseTransformer(this.eventStreamResponseHandler, this.initialResponseHandler, this.eventResponseHandler, this.exceptionResponseHandler, this.executor, this.future, this.serviceName);
        }
    }

    private class EventPublisher
    implements SdkPublisher<EventT> {
        private final Subscription dataSubscription;

        private EventPublisher(Subscription dataSubscription) {
            this.dataSubscription = dataSubscription;
        }

        public void subscribe(Subscriber<? super EventT> subscriber) {
            if (!EventStreamAsyncResponseTransformer.this.subscriberRef.compareAndSet(null, subscriber)) {
                log.error("Event stream publishers can only be subscribed to once.");
                throw new IllegalStateException("This publisher may only be subscribed to once");
            }
            subscriber.onSubscribe(new Subscription(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void request(long l) {
                    if (EventStreamAsyncResponseTransformer.this.isDone) {
                        return;
                    }
                    Queue queue = EventStreamAsyncResponseTransformer.this.eventsToDeliver;
                    synchronized (queue) {
                        EventStreamAsyncResponseTransformer.this.remainingDemand.addAndGet(l);
                        if (!EventStreamAsyncResponseTransformer.this.eventsToDeliver.isEmpty()) {
                            EventStreamAsyncResponseTransformer.this.drainEventsIfNotAlready();
                        } else {
                            EventStreamAsyncResponseTransformer.this.requestDataIfNotAlready();
                        }
                    }
                }

                public void cancel() {
                    EventPublisher.this.dataSubscription.cancel();
                }
            });
        }
    }

    private class ByteSubscriber
    implements Subscriber<ByteBuffer> {
        private final CompletableFuture<Subscription> dataSubscriptionFuture;

        private ByteSubscriber(CompletableFuture<Subscription> dataSubscriptionFuture) {
            this.dataSubscriptionFuture = dataSubscriptionFuture;
        }

        public void onSubscribe(Subscription subscription) {
            EventStreamAsyncResponseTransformer.this.dataSubscription.set(subscription);
            this.dataSubscriptionFuture.complete(subscription);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(ByteBuffer buffer) {
            if (EventStreamAsyncResponseTransformer.this.isDone) {
                return;
            }
            Queue queue = EventStreamAsyncResponseTransformer.this.eventsToDeliver;
            synchronized (queue) {
                EventStreamAsyncResponseTransformer.this.decoder.feed(BinaryUtils.copyBytesFrom((ByteBuffer)buffer));
                if (!EventStreamAsyncResponseTransformer.this.eventsToDeliver.isEmpty()) {
                    EventStreamAsyncResponseTransformer.this.isRequesting.compareAndSet(true, false);
                    EventStreamAsyncResponseTransformer.this.drainEventsIfNotAlready();
                } else if (EventStreamAsyncResponseTransformer.this.remainingDemand.get() > 0L) {
                    ((Subscription)EventStreamAsyncResponseTransformer.this.dataSubscription.get()).request(1L);
                }
            }
        }

        public void onError(Throwable throwable) {
        }

        public void onComplete() {
        }
    }
}

