/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsRetrievalStrategy;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AsynchronousGetRecordsRetrievalStrategy
implements GetRecordsRetrievalStrategy {
    private static final Log log = LogFactory.getLog(AsynchronousGetRecordsRetrievalStrategy.class);
    private static final int TIME_TO_KEEP_ALIVE = 5;
    private static final int CORE_THREAD_POOL_COUNT = 1;
    private final KinesisDataFetcher dataFetcher;
    private final ExecutorService executorService;
    private final int retryGetRecordsInSeconds;
    private final String shardId;
    final CompletionService<GetRecordsResult> completionService;

    public AsynchronousGetRecordsRetrievalStrategy(@NonNull KinesisDataFetcher dataFetcher, int retryGetRecordsInSeconds, int maxGetRecordsThreadPool, String shardId) {
        this(dataFetcher, AsynchronousGetRecordsRetrievalStrategy.buildExector(maxGetRecordsThreadPool, shardId), retryGetRecordsInSeconds, shardId);
        if (dataFetcher == null) {
            throw new NullPointerException("dataFetcher");
        }
    }

    public AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, String shardId) {
        this(dataFetcher, executorService, retryGetRecordsInSeconds, new ExecutorCompletionService<GetRecordsResult>(executorService), shardId);
    }

    AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, CompletionService<GetRecordsResult> completionService, String shardId) {
        this.dataFetcher = dataFetcher;
        this.executorService = executorService;
        this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
        this.completionService = completionService;
        this.shardId = shardId;
    }

    @Override
    public GetRecordsResult getRecords(int maxRecords) {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Strategy has been shutdown");
        }
        GetRecordsResult result = null;
        HashSet<Future<GetRecordsResult>> futures = new HashSet<Future<GetRecordsResult>>();
        Callable<GetRecordsResult> retrieverCall = this.createRetrieverCallable(maxRecords);
        while (true) {
            try {
                futures.add(this.completionService.submit(retrieverCall));
            }
            catch (RejectedExecutionException e) {
                log.warn((Object)"Out of resources, unable to start additional requests.");
            }
            try {
                Future<GetRecordsResult> resultFuture = this.completionService.poll(this.retryGetRecordsInSeconds, TimeUnit.SECONDS);
                if (resultFuture == null) continue;
                result = resultFuture.get();
            }
            catch (ExecutionException e) {
                log.error((Object)"ExecutionException thrown while trying to get records", (Throwable)e);
                continue;
            }
            catch (InterruptedException e) {
                log.error((Object)"Thread was interrupted", (Throwable)e);
            }
            break;
        }
        futures.stream().peek(f -> f.cancel(true)).filter(Future::isCancelled).forEach(f -> {
            try {
                this.completionService.take();
            }
            catch (InterruptedException e) {
                log.error((Object)"Exception thrown while trying to empty the threadpool.");
            }
        });
        return result;
    }

    private Callable<GetRecordsResult> createRetrieverCallable(int maxRecords) {
        ThreadSafeMetricsDelegatingScope metricsScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope());
        return () -> {
            try {
                MetricsHelper.setMetricsScope(metricsScope);
                GetRecordsResult getRecordsResult = this.dataFetcher.getRecords(maxRecords);
                return getRecordsResult;
            }
            finally {
                MetricsHelper.unsetMetricsScope();
            }
        };
    }

    @Override
    public void shutdown() {
        this.executorService.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return this.executorService.isShutdown();
    }

    private static ExecutorService buildExector(int maxGetRecordsThreadPool, String shardId) {
        String threadNameFormat = "get-records-worker-" + shardId + "-%d";
        return new ThreadPoolExecutor(1, maxGetRecordsThreadPool, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadNameFormat).build(), new ThreadPoolExecutor.AbortPolicy());
    }
}

