/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.awscore.eventstream;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.awscore.eventstream.EventStreamResponseHandler;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.http.HttpResponse;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.core.internal.util.ThrowableUtils;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.FunctionalUtils;
import software.amazon.eventstream.HeaderValue;
import software.amazon.eventstream.Message;
import software.amazon.eventstream.MessageDecoder;

@SdkProtectedApi
public class EventStreamAsyncResponseTransformer<ResponseT, EventT>
implements AsyncResponseTransformer<SdkResponse, Void> {
    private static final Logger log = LoggerFactory.getLogger(EventStreamAsyncResponseTransformer.class);
    private static final ExecutionAttributes EMPTY_EXECUTION_ATTRIBUTES = new ExecutionAttributes();
    private final EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseTransformer;
    private final HttpResponseHandler<? extends ResponseT> initialResponseUnmarshaller;
    private final HttpResponseHandler<? extends EventT> eventUnmarshaller;
    private final HttpResponseHandler<? extends Throwable> exceptionUnmarshaller;
    private final CompletableFuture<Void> future;
    private final AtomicLong remainingDemand = new AtomicLong(0L);
    private final AtomicReference<Subscriber<? super EventT>> subscriberRef = new AtomicReference();
    private final MessageDecoder decoder = this.createDecoder();
    private final String serviceName;
    private volatile boolean isDone = false;
    private final AtomicReference<Throwable> error = new AtomicReference();
    private String requestId = null;
    private String extendedRequestId = null;

    public EventStreamAsyncResponseTransformer(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseTransformer, HttpResponseHandler<? extends ResponseT> initialResponseUnmarshaller, HttpResponseHandler<? extends EventT> eventUnmarshaller, HttpResponseHandler<? extends Throwable> exceptionUnmarshaller) {
        this(eventStreamResponseTransformer, initialResponseUnmarshaller, eventUnmarshaller, exceptionUnmarshaller, new CompletableFuture<Void>(), "");
    }

    private EventStreamAsyncResponseTransformer(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler, HttpResponseHandler<? extends ResponseT> initialResponseHandler, HttpResponseHandler<? extends EventT> eventResponseHandler, HttpResponseHandler<? extends Throwable> exceptionResponseHandler, CompletableFuture<Void> future, String serviceName) {
        this.eventStreamResponseTransformer = eventStreamResponseHandler;
        this.initialResponseUnmarshaller = initialResponseHandler;
        this.eventUnmarshaller = eventResponseHandler;
        this.exceptionUnmarshaller = exceptionResponseHandler;
        this.future = future;
        this.serviceName = serviceName;
    }

    public void responseReceived(SdkResponse response) {
        if (response != null && response.sdkHttpResponse() != null) {
            this.requestId = response.sdkHttpResponse().firstMatchingHeader("x-amzn-RequestId").orElse(null);
            this.extendedRequestId = response.sdkHttpResponse().firstMatchingHeader("x-amz-id-2").orElse(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onStream(SdkPublisher<ByteBuffer> publisher) {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            this.isDone = false;
        }
        CompletableFuture dataSubscriptionFuture = new CompletableFuture();
        publisher.subscribe((Subscriber)new ByteSubscriber(dataSubscriptionFuture));
        dataSubscriptionFuture.thenAccept(dataSubscription -> {
            EventPublisher eventPublisher = new EventPublisher((Subscription)dataSubscription);
            try {
                this.eventStreamResponseTransformer.onEventStream(eventPublisher);
            }
            catch (Throwable t) {
                dataSubscription.cancel();
                this.exceptionOccurred(t);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exceptionOccurred(Throwable throwable) {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            if (!this.isDone) {
                this.isDone = true;
                this.error.set(throwable);
                if (this.subscriberRef.get() != null) {
                    FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from Subscriber#onError, ignoring.", () -> this.subscriberRef.get().onError(throwable));
                }
                this.eventStreamResponseTransformer.exceptionOccurred(throwable);
            }
        }
    }

    public Void complete() {
        EventStreamAsyncResponseTransformer eventStreamAsyncResponseTransformer = this;
        synchronized (eventStreamAsyncResponseTransformer) {
            if (!this.isDone) {
                this.isDone = true;
                if (this.subscriberRef.get() != null) {
                    FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from Subscriber#onComplete, ignoring.", () -> this.subscriberRef.get().onComplete());
                }
                this.future.complete(null);
                this.eventStreamResponseTransformer.complete();
                return null;
            }
            throw ThrowableUtils.failure((Throwable)this.error.get());
        }
    }

    private MessageDecoder createDecoder() {
        return new MessageDecoder(m -> {
            try {
                if (this.isEvent((Message)m)) {
                    if (((HeaderValue)m.getHeaders().get(":event-type")).getString().equals("initial-response")) {
                        this.eventStreamResponseTransformer.responseReceived(this.initialResponseUnmarshaller.handle(this.adaptMessageToResponse((Message)m, false), EMPTY_EXECUTION_ATTRIBUTES));
                    } else {
                        this.remainingDemand.decrementAndGet();
                        this.subscriberRef.get().onNext(this.eventUnmarshaller.handle(this.adaptMessageToResponse((Message)m, false), EMPTY_EXECUTION_ATTRIBUTES));
                    }
                } else if (this.isError((Message)m) || this.isException((Message)m)) {
                    HttpResponse errorResponse = this.adaptMessageToResponse((Message)m, true);
                    Throwable exception = (Throwable)this.exceptionUnmarshaller.handle(errorResponse, new ExecutionAttributes().putAttribute(SdkExecutionAttribute.SERVICE_NAME, (Object)this.serviceName));
                    FunctionalUtils.runAndLogError((Logger)log, (String)"Error thrown from exceptionOccurred, ignoring.", () -> this.exceptionOccurred(exception));
                }
            }
            catch (Exception e) {
                throw SdkClientException.builder().cause((Throwable)e).build();
            }
        });
    }

    private boolean isEvent(Message m) {
        return "event".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private boolean isError(Message m) {
        return "error".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private boolean isException(Message m) {
        return "exception".equals(((HeaderValue)m.getHeaders().get(":message-type")).getString());
    }

    private HttpResponse adaptMessageToResponse(Message m, boolean isException) {
        HttpResponse response = new HttpResponse(null);
        response.setContent((InputStream)new ByteArrayInputStream(m.getPayload()));
        m.getHeaders().forEach((k, v) -> response.addHeader(k, v.getString()));
        if (this.requestId != null) {
            response.addHeader("x-amzn-RequestId", this.requestId);
        }
        if (this.extendedRequestId != null) {
            response.addHeader("x-amz-id-2", this.extendedRequestId);
        }
        response.setStatusCode(isException ? 500 : 200);
        return response;
    }

    public static <ResponseT, EventT> Builder<ResponseT, EventT> builder() {
        return new Builder();
    }

    public static final class Builder<ResponseT, EventT> {
        private EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler;
        private HttpResponseHandler<? extends ResponseT> initialResponseHandler;
        private HttpResponseHandler<? extends EventT> eventResponseHandler;
        private HttpResponseHandler<? extends Throwable> exceptionResponseHandler;
        private CompletableFuture<Void> future;
        private String serviceName;

        private Builder() {
        }

        public Builder<ResponseT, EventT> eventStreamResponseHandler(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler) {
            this.eventStreamResponseHandler = eventStreamResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> initialResponseHandler(HttpResponseHandler<? extends ResponseT> initialResponseHandler) {
            this.initialResponseHandler = initialResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> eventResponseHandler(HttpResponseHandler<? extends EventT> eventResponseHandler) {
            this.eventResponseHandler = eventResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> exceptionResponseHandler(HttpResponseHandler<? extends Throwable> exceptionResponseHandler) {
            this.exceptionResponseHandler = exceptionResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> executor(Executor executor) {
            return this;
        }

        public Builder<ResponseT, EventT> future(CompletableFuture<Void> future) {
            this.future = future;
            return this;
        }

        public Builder<ResponseT, EventT> serviceName(String serviceName) {
            this.serviceName = serviceName;
            return this;
        }

        public EventStreamAsyncResponseTransformer<ResponseT, EventT> build() {
            return new EventStreamAsyncResponseTransformer(this.eventStreamResponseHandler, this.initialResponseHandler, this.eventResponseHandler, this.exceptionResponseHandler, this.future, this.serviceName);
        }
    }

    private class EventPublisher
    implements SdkPublisher<EventT> {
        private final Subscription dataSubscription;

        private EventPublisher(Subscription dataSubscription) {
            this.dataSubscription = dataSubscription;
        }

        public void subscribe(Subscriber<? super EventT> subscriber) {
            if (!EventStreamAsyncResponseTransformer.this.subscriberRef.compareAndSet(null, subscriber)) {
                log.error("Event stream publishers can only be subscribed to once.");
                throw new IllegalStateException("This publisher may only be subscribed to once");
            }
            subscriber.onSubscribe(new Subscription(){

                public void request(long l) {
                    EventPublisher.this.dataSubscription.request(1L);
                    EventStreamAsyncResponseTransformer.this.remainingDemand.addAndGet(l);
                }

                public void cancel() {
                    EventPublisher.this.dataSubscription.cancel();
                }
            });
        }
    }

    private class ByteSubscriber
    implements Subscriber<ByteBuffer> {
        private final CompletableFuture<Subscription> dataSubscriptionFuture;
        private Subscription subscription;

        private ByteSubscriber(CompletableFuture<Subscription> dataSubscriptionFuture) {
            this.dataSubscriptionFuture = dataSubscriptionFuture;
        }

        public void onSubscribe(Subscription subscription) {
            this.dataSubscriptionFuture.complete(subscription);
            this.subscription = subscription;
        }

        public void onNext(ByteBuffer buffer) {
            EventStreamAsyncResponseTransformer.this.decoder.feed(BinaryUtils.copyBytesFrom((ByteBuffer)buffer));
            if (EventStreamAsyncResponseTransformer.this.remainingDemand.get() > 0L) {
                this.subscription.request(1L);
            }
        }

        public void onError(Throwable throwable) {
        }

        public void onComplete() {
        }
    }
}

