/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.retrieval.polling;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher;

@KinesisClientInternalApi
public class AsynchronousGetRecordsRetrievalStrategy
implements GetRecordsRetrievalStrategy {
    private static final Logger log = LoggerFactory.getLogger(AsynchronousGetRecordsRetrievalStrategy.class);
    private static final int TIME_TO_KEEP_ALIVE = 5;
    private static final int CORE_THREAD_POOL_COUNT = 1;
    private final KinesisDataFetcher dataFetcher;
    private final ExecutorService executorService;
    private final int retryGetRecordsInSeconds;
    private final String shardId;
    final Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;

    public AsynchronousGetRecordsRetrievalStrategy(@NonNull KinesisDataFetcher dataFetcher, int retryGetRecordsInSeconds, int maxGetRecordsThreadPool, String shardId) {
        this(dataFetcher, AsynchronousGetRecordsRetrievalStrategy.buildExector(maxGetRecordsThreadPool, shardId), retryGetRecordsInSeconds, shardId);
        if (dataFetcher == null) {
            throw new NullPointerException("dataFetcher");
        }
    }

    public AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, String shardId) {
        this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService(executorService), shardId);
    }

    AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier, String shardId) {
        this.dataFetcher = dataFetcher;
        this.executorService = executorService;
        this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
        this.completionServiceSupplier = completionServiceSupplier;
        this.shardId = shardId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetRecordsResponse getRecords(int maxRecords) {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Strategy has been shutdown");
        }
        GetRecordsResponse result = null;
        CompletionService<DataFetcherResult> completionService = this.completionServiceSupplier.get();
        HashSet<Future<DataFetcherResult>> futures = new HashSet<Future<DataFetcherResult>>();
        Callable<DataFetcherResult> retrieverCall = this.createRetrieverCallable();
        try {
            while (true) {
                try {
                    futures.add(completionService.submit(retrieverCall));
                }
                catch (RejectedExecutionException e) {
                    log.warn("Out of resources, unable to start additional requests.");
                }
                try {
                    Future<DataFetcherResult> resultFuture = completionService.poll(this.retryGetRecordsInSeconds, TimeUnit.SECONDS);
                    if (resultFuture == null) continue;
                    result = resultFuture.get().accept();
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof ExpiredIteratorException) {
                        throw (ExpiredIteratorException)e.getCause();
                    }
                    log.error("ExecutionException thrown while trying to get records", (Throwable)e);
                    continue;
                }
                catch (InterruptedException e) {
                    log.error("Thread was interrupted", (Throwable)e);
                }
                break;
            }
        }
        finally {
            futures.forEach(f -> f.cancel(true));
        }
        return result;
    }

    private Callable<DataFetcherResult> createRetrieverCallable() {
        return this.dataFetcher::getRecords;
    }

    @Override
    public void shutdown() {
        this.executorService.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return this.executorService.isShutdown();
    }

    private static ExecutorService buildExector(int maxGetRecordsThreadPool, String shardId) {
        String threadNameFormat = "get-records-worker-" + shardId + "-%d";
        return new ThreadPoolExecutor(1, maxGetRecordsThreadPool, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadNameFormat).build(), new ThreadPoolExecutor.AbortPolicy());
    }

    @Override
    public KinesisDataFetcher getDataFetcher() {
        return this.dataFetcher;
    }
}

