/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.retrieval.fanout;

import java.util.concurrent.ExecutionException;
import lombok.NonNull;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.ConsumerRegistration;

@KinesisClientInternalApi
public class FanOutConsumerRegistration
implements ConsumerRegistration {
    private static final Logger log = LoggerFactory.getLogger(FanOutConsumerRegistration.class);
    @NonNull
    private final KinesisAsyncClient kinesisClient;
    private final String streamName;
    @NonNull
    private final String streamConsumerName;
    private final int maxDescribeStreamSummaryRetries;
    private final int maxDescribeStreamConsumerRetries;
    private final int registerStreamConsumerRetries;
    private final long retryBackoffMillis;
    private String streamArn;
    private String streamConsumerArn;

    @Override
    public String getOrCreateStreamConsumerArn() throws DependencyException {
        if (StringUtils.isEmpty((String)this.streamConsumerArn)) {
            DescribeStreamConsumerResponse response = null;
            try {
                response = this.describeStreamConsumer();
            }
            catch (ResourceNotFoundException e) {
                log.info("StreamConsumer not found, need to create it.");
            }
            if (response == null) {
                LimitExceededException finalException = null;
                try {
                    for (int retries = this.registerStreamConsumerRetries; retries > 0; --retries) {
                        finalException = null;
                        try {
                            RegisterStreamConsumerResponse registerResponse = this.registerStreamConsumer();
                            this.streamConsumerArn(registerResponse.consumer().consumerARN());
                            break;
                        }
                        catch (LimitExceededException e) {
                            log.debug("RegisterStreamConsumer call got throttled will retry.");
                            finalException = e;
                            continue;
                        }
                    }
                    if (finalException != null) {
                        throw new DependencyException(finalException);
                    }
                }
                catch (ResourceInUseException e) {
                    log.debug("Got ResourceInUseException consumer exists, will call DescribeStreamConsumer again.");
                    response = this.describeStreamConsumer();
                }
            }
            if (response != null) {
                this.streamConsumerArn(response.consumerDescription().consumerARN());
            }
            this.waitForActive();
        }
        return this.streamConsumerArn;
    }

    private RegisterStreamConsumerResponse registerStreamConsumer() throws DependencyException {
        AWSExceptionManager exceptionManager = this.createExceptionManager();
        try {
            RegisterStreamConsumerRequest request = (RegisterStreamConsumerRequest)KinesisRequestsBuilder.registerStreamConsumerRequestBuilder().streamARN(this.streamArn()).consumerName(this.streamConsumerName).build();
            return (RegisterStreamConsumerResponse)this.kinesisClient.registerStreamConsumer(request).get();
        }
        catch (ExecutionException e) {
            throw exceptionManager.apply(e.getCause());
        }
        catch (InterruptedException e) {
            throw new DependencyException(e);
        }
    }

    private DescribeStreamConsumerResponse describeStreamConsumer() throws DependencyException {
        DescribeStreamConsumerRequest.Builder requestBuilder = KinesisRequestsBuilder.describeStreamConsumerRequestBuilder();
        DescribeStreamConsumerRequest request = StringUtils.isEmpty((String)this.streamConsumerArn) ? (DescribeStreamConsumerRequest)requestBuilder.streamARN(this.streamArn()).consumerName(this.streamConsumerName).build() : (DescribeStreamConsumerRequest)requestBuilder.consumerARN(this.streamConsumerArn).build();
        ServiceCallerSupplier<DescribeStreamConsumerResponse> dsc = () -> (DescribeStreamConsumerResponse)this.kinesisClient.describeStreamConsumer(request).get();
        return this.retryWhenThrottled(dsc, this.maxDescribeStreamConsumerRetries, "DescribeStreamConsumer");
    }

    private void waitForActive() throws DependencyException {
        ConsumerStatus status = null;
        for (int retries = this.maxDescribeStreamConsumerRetries; !ConsumerStatus.ACTIVE.equals(status) && retries > 0; --retries) {
            status = this.describeStreamConsumer().consumerDescription().consumerStatus();
        }
        if (!ConsumerStatus.ACTIVE.equals(status)) {
            String message = String.format("Status of StreamConsumer %s, was not ACTIVE after all retries. Was instead %s.", this.streamConsumerName, status);
            log.error(message);
            throw new IllegalStateException(message);
        }
    }

    private String streamArn() throws DependencyException {
        if (StringUtils.isEmpty((String)this.streamArn)) {
            DescribeStreamSummaryRequest request = (DescribeStreamSummaryRequest)KinesisRequestsBuilder.describeStreamSummaryRequestBuilder().streamName(this.streamName).build();
            ServiceCallerSupplier<String> dss = () -> ((DescribeStreamSummaryResponse)this.kinesisClient.describeStreamSummary(request).get()).streamDescriptionSummary().streamARN();
            this.streamArn = this.retryWhenThrottled(dss, this.maxDescribeStreamSummaryRetries, "DescribeStreamSummary");
        }
        return this.streamArn;
    }

    private <T> T retryWhenThrottled(@NonNull ServiceCallerSupplier<T> retriever, int maxRetries, @NonNull String apiName) throws DependencyException {
        if (retriever == null) {
            throw new NullPointerException("retriever");
        }
        if (apiName == null) {
            throw new NullPointerException("apiName");
        }
        AWSExceptionManager exceptionManager = this.createExceptionManager();
        LimitExceededException finalException = null;
        for (int retries = maxRetries; retries > 0; --retries) {
            try {
                try {
                    return retriever.get();
                }
                catch (ExecutionException e) {
                    throw exceptionManager.apply(e.getCause());
                }
                catch (InterruptedException e) {
                    throw new DependencyException(e);
                }
            }
            catch (LimitExceededException e) {
                log.info("Throttled while calling {} API, will backoff.", (Object)apiName);
                try {
                    Thread.sleep(this.retryBackoffMillis + (long)(Math.random() * 100.0));
                }
                catch (InterruptedException ie) {
                    log.debug("Sleep interrupted, shutdown invoked.");
                }
                finalException = e;
                continue;
            }
        }
        if (finalException == null) {
            throw new IllegalStateException(String.format("Finished all retries and no exception was caught while calling %s", apiName));
        }
        throw finalException;
    }

    private AWSExceptionManager createExceptionManager() {
        AWSExceptionManager exceptionManager = new AWSExceptionManager();
        exceptionManager.add(LimitExceededException.class, t -> t);
        exceptionManager.add(ResourceInUseException.class, t -> t);
        exceptionManager.add(ResourceNotFoundException.class, t -> t);
        exceptionManager.add(KinesisException.class, t -> t);
        return exceptionManager;
    }

    public FanOutConsumerRegistration(@NonNull KinesisAsyncClient kinesisClient, String streamName, @NonNull String streamConsumerName, int maxDescribeStreamSummaryRetries, int maxDescribeStreamConsumerRetries, int registerStreamConsumerRetries, long retryBackoffMillis) {
        if (kinesisClient == null) {
            throw new NullPointerException("kinesisClient");
        }
        if (streamConsumerName == null) {
            throw new NullPointerException("streamConsumerName");
        }
        this.kinesisClient = kinesisClient;
        this.streamName = streamName;
        this.streamConsumerName = streamConsumerName;
        this.maxDescribeStreamSummaryRetries = maxDescribeStreamSummaryRetries;
        this.maxDescribeStreamConsumerRetries = maxDescribeStreamConsumerRetries;
        this.registerStreamConsumerRetries = registerStreamConsumerRetries;
        this.retryBackoffMillis = retryBackoffMillis;
    }

    private FanOutConsumerRegistration streamConsumerArn(String streamConsumerArn) {
        this.streamConsumerArn = streamConsumerArn;
        return this;
    }

    @FunctionalInterface
    private static interface ServiceCallerSupplier<T> {
        public T get() throws ExecutionException, InterruptedException;
    }
}

