/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kinesis.provisioning;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisConsumerDestination;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisProducerDestination;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ScalingType;
import software.amazon.awssdk.services.kinesis.model.Shard;

public class KinesisStreamProvisioner
implements ProvisioningProvider<ExtendedConsumerProperties<KinesisConsumerProperties>, ExtendedProducerProperties<KinesisProducerProperties>> {
    private static final Log logger = LogFactory.getLog(KinesisStreamProvisioner.class);
    private final KinesisAsyncClient amazonKinesis;
    private final KinesisBinderConfigurationProperties configurationProperties;

    public KinesisStreamProvisioner(KinesisAsyncClient amazonKinesis, KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) {
        Assert.notNull((Object)amazonKinesis, (String)"'amazonKinesis' must not be null");
        Assert.notNull((Object)kinesisBinderConfigurationProperties, (String)"'kinesisBinderConfigurationProperties' must not be null");
        this.amazonKinesis = amazonKinesis;
        this.configurationProperties = kinesisBinderConfigurationProperties;
    }

    public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<KinesisProducerProperties> properties) throws ProvisioningException {
        KinesisProducerProperties kinesisProducerProperties;
        if (logger.isInfoEnabled()) {
            logger.info((Object)("Using Kinesis stream for outbound: " + name));
        }
        if ((kinesisProducerProperties = (KinesisProducerProperties)properties.getExtension()).isEmbedHeaders()) {
            properties.setHeaderMode(HeaderMode.none);
        }
        return new KinesisProducerDestination(name, this.createOrUpdate(name, properties.getPartitionCount()));
    }

    public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<KinesisConsumerProperties> properties) throws ProvisioningException {
        String[] streams;
        KinesisConsumerProperties kinesisConsumerProperties = (KinesisConsumerProperties)properties.getExtension();
        if (kinesisConsumerProperties.isEmbedHeaders()) {
            properties.setHeaderMode(HeaderMode.none);
        }
        int shardCount = properties.getInstanceCount() * properties.getConcurrency();
        if (!properties.isMultiplex()) {
            List<Shard> shardList = this.provisionKinesisConsumerDestination(name, shardCount);
            return new KinesisConsumerDestination(name, shardList);
        }
        for (String stream : streams = StringUtils.commaDelimitedListToStringArray((String)name)) {
            this.provisionKinesisConsumerDestination(stream.trim(), shardCount);
        }
        return new KinesisConsumerDestination(name, List.of());
    }

    private List<Shard> provisionKinesisConsumerDestination(String stream, int shards) {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("Using Kinesis stream for inbound: " + stream));
        }
        return this.createOrUpdate(stream, shards);
    }

    private List<Shard> createOrUpdate(String stream, int shards) {
        List<Shard> shardList;
        try {
            shardList = this.getShardList(stream).join();
        }
        catch (CompletionException ex) {
            Throwable cause = ex.getCause();
            if (cause instanceof ResourceNotFoundException) {
                if (!this.configurationProperties.isAutoCreateStream()) {
                    throw new ProvisioningException("The stream [" + stream + "] was not found and auto creation is disabled.", cause);
                }
                if (logger.isInfoEnabled()) {
                    logger.info((Object)("Stream '" + stream + "' not found. Create one..."));
                }
                shardList = this.createStream(stream, shards);
            }
            throw new ProvisioningException("Cannot retrieve shards information for stream [" + stream + "].", cause);
        }
        int effectiveShardCount = Math.max(this.configurationProperties.getMinShardCount(), shards);
        if (shardList.size() < effectiveShardCount && this.configurationProperties.isAutoAddShards()) {
            return this.updateShardCount(stream, shardList.size(), effectiveShardCount);
        }
        return shardList;
    }

    private CompletableFuture<List<Shard>> getShardList(String stream) {
        return ((CompletableFuture)this.amazonKinesis.describeStreamSummary(request -> request.streamName(stream)).thenCompose(reply -> this.amazonKinesis.listShards(request -> request.streamName(stream)))).thenApply(ListShardsResponse::shards);
    }

    private List<Shard> createStream(String streamName, int shards) {
        try {
            return (List)((CompletableFuture)((CompletableFuture)this.amazonKinesis.createStream(request -> request.streamName(streamName).shardCount(Integer.valueOf(Math.max(this.configurationProperties.getMinShardCount(), shards)))).thenCompose(reply -> this.waitForStreamToBecomeActive(streamName))).thenCompose(reply -> this.getShardList(streamName))).join();
        }
        catch (Exception ex) {
            throw new ProvisioningException("Cannot create stream [" + streamName + "].", (Throwable)ex);
        }
    }

    private CompletableFuture<WaiterResponse<DescribeStreamResponse>> waitForStreamToBecomeActive(String streamName) {
        return this.amazonKinesis.waiter().waitUntilStreamExists(request -> request.streamName(streamName), waiter -> waiter.maxAttempts(Integer.valueOf(this.configurationProperties.getDescribeStreamRetries())).backoffStrategy((BackoffStrategy)FixedDelayBackoffStrategy.create((Duration)Duration.ofMillis(this.configurationProperties.getDescribeStreamBackoff()))));
    }

    private List<Shard> updateShardCount(String streamName, int shardCount, int targetCount) {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("Stream [" + streamName + "] has [" + shardCount + "] shards compared to a target configuration of [" + targetCount + "], creating shards..."));
        }
        return (List)((CompletableFuture)((CompletableFuture)this.amazonKinesis.updateShardCount(request -> request.streamName(streamName).targetShardCount(Integer.valueOf(targetCount)).scalingType(ScalingType.UNIFORM_SCALING)).thenCompose(reply -> this.waitForStreamToBecomeActive(streamName))).thenCompose(reply -> this.getShardList(streamName))).join();
    }
}

