/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTaskManager;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class Worker
implements Runnable {
    private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
    private static final Log LOG = LogFactory.getLog(Worker.class);
    private WorkerLog wlog = new WorkerLog();
    private final String applicationName;
    private final IRecordProcessorFactory recordProcessorFactory;
    private final StreamConfig streamConfig;
    private final InitialPositionInStream initialPosition;
    private final ICheckpoint checkpointTracker;
    private final long idleTimeInMilliseconds;
    private final long parentShardPollIntervalMillis;
    private final ExecutorService executorService;
    private final IMetricsFactory metricsFactory;
    private final long taskBackoffTimeMillis;
    private final KinesisClientLibLeaseCoordinator leaseCoordinator;
    private final ShardSyncTaskManager controlServer;
    private boolean shutdown;
    private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
    private final boolean cleanupLeasesUponShardCompletion;

    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config) {
        this(recordProcessorFactory, config, Executors.newCachedThreadPool());
    }

    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, ExecutorService execService) {
        this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()), new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), config.getCloudWatchClientConfiguration()), execService);
    }

    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) {
        this(recordProcessorFactory, config, metricsFactory, Executors.newCachedThreadPool());
    }

    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) {
        this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()), metricsFactory, execService);
    }

    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) {
        this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, Executors.newCachedThreadPool());
    }

    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) {
        this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, (IMetricsFactory)new CWMetricsFactory(cloudWatchClient, config.getApplicationName(), config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize()), execService);
        if (config.getRegionName() != null) {
            Region region = RegionUtils.getRegion((String)config.getRegionName());
            cloudWatchClient.setRegion(region);
            LOG.debug((Object)("The region of Amazon CloudWatch client has been set to " + config.getRegionName()));
        }
    }

    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
        this(recordProcessorFactory, config, new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient).getProxy(config.getStreamName()), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing()), new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getApplicationName(), (AmazonDynamoDB)dynamoDBClient), config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), metricsFactory), metricsFactory, execService);
        if (config.getRegionName() != null) {
            Region region = RegionUtils.getRegion((String)config.getRegionName());
            kinesisClient.setRegion(region);
            LOG.debug((Object)("The region of Amazon Kinesis client has been set to " + config.getRegionName()));
            dynamoDBClient.setRegion(region);
            LOG.debug((Object)("The region of Amazon DynamoDB client has been set to " + config.getRegionName()));
        }
        if (config.getKinesisEndpoint() != null) {
            kinesisClient.setEndpoint(config.getKinesisEndpoint());
            if (config.getRegionName() != null) {
                LOG.warn((Object)("Received configuration for both region name as " + config.getRegionName() + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() + ". Amazon Kinesis endpoint will overwrite region name."));
                LOG.debug((Object)("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()));
            } else {
                LOG.debug((Object)("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()));
            }
        }
    }

    private Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, KinesisClientLibLeaseCoordinator leaseCoordinator, IMetricsFactory metricsFactory, ExecutorService execService) {
        this(config.getApplicationName(), recordProcessorFactory, streamConfig, config.getInitialPositionInStream(), config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), leaseCoordinator, leaseCoordinator, execService, metricsFactory, config.getTaskBackoffTimeMillis());
    }

    Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, InitialPositionInStream initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis) {
        this.applicationName = applicationName;
        this.recordProcessorFactory = recordProcessorFactory;
        this.streamConfig = streamConfig;
        this.initialPosition = initialPositionInStream;
        this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
        this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
        this.checkpointTracker = checkpoint;
        this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
        this.executorService = execService;
        this.leaseCoordinator = leaseCoordinator;
        this.metricsFactory = metricsFactory;
        this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory, this.executorService);
        this.taskBackoffTimeMillis = taskBackoffTimeMillis;
    }

    public String getApplicationName() {
        return this.applicationName;
    }

    @Override
    public void run() {
        try {
            this.initialize();
            LOG.info((Object)"Initialization complete. Starting worker loop.");
        }
        catch (RuntimeException e1) {
            LOG.error((Object)"Unable to initialize after 20 attempts. Shutting down.", (Throwable)e1);
            this.shutdown();
        }
        while (!this.shutdown) {
            try {
                boolean foundCompletedShard = false;
                HashSet<ShardInfo> assignedShards = new HashSet<ShardInfo>();
                for (ShardInfo shardInfo : this.getShardInfoForAssignments()) {
                    ShardConsumer shardConsumer = this.createOrGetShardConsumer(shardInfo, this.recordProcessorFactory);
                    if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals((Object)ShutdownReason.TERMINATE)) {
                        foundCompletedShard = true;
                    } else {
                        shardConsumer.consumeShard();
                    }
                    assignedShards.add(shardInfo);
                }
                if (foundCompletedShard) {
                    this.controlServer.syncShardAndLeaseInfo(null);
                }
                this.cleanupShardConsumers(assignedShards);
                this.wlog.info("Sleeping ...");
                Thread.sleep(this.idleTimeInMilliseconds);
            }
            catch (Exception e) {
                LOG.error((Object)String.format("Worker.run caught exception, sleeping for %s milli seconds!", String.valueOf(this.idleTimeInMilliseconds)), (Throwable)e);
                try {
                    Thread.sleep(this.idleTimeInMilliseconds);
                }
                catch (InterruptedException ex) {
                    LOG.info((Object)"Worker: sleep interrupted after catching exception ", (Throwable)ex);
                }
            }
            this.wlog.resetInfoLogging();
        }
        LOG.info((Object)"Stopping LeaseCoordinator.");
        this.leaseCoordinator.stop();
    }

    private void initialize() {
        boolean isDone = false;
        Exception lastException = null;
        for (int i = 0; !isDone && i < 20; ++i) {
            try {
                LOG.info((Object)("Initialization attempt " + (i + 1)));
                LOG.info((Object)"Initializing LeaseCoordinator");
                this.leaseCoordinator.initialize();
                LOG.info((Object)"Syncing Kinesis shard info");
                ShardSyncTask shardSyncTask = new ShardSyncTask(this.streamConfig.getStreamProxy(), this.leaseCoordinator.getLeaseManager(), this.initialPosition, this.cleanupLeasesUponShardCompletion, 0L);
                TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, this.metricsFactory).call();
                if (result.getException() == null) {
                    if (!this.leaseCoordinator.isRunning()) {
                        LOG.info((Object)"Starting LeaseCoordinator");
                        this.leaseCoordinator.start();
                    } else {
                        LOG.info((Object)"LeaseCoordinator is already running. No need to start it.");
                    }
                    isDone = true;
                } else {
                    lastException = result.getException();
                }
            }
            catch (LeasingException e) {
                LOG.error((Object)"Caught exception when initializing LeaseCoordinator", (Throwable)e);
                lastException = e;
            }
            catch (Exception e) {
                lastException = e;
            }
            try {
                Thread.sleep(this.parentShardPollIntervalMillis);
                continue;
            }
            catch (InterruptedException e) {
                LOG.debug((Object)"Sleep interrupted while initializing worker.");
            }
        }
        if (!isDone) {
            throw new RuntimeException(lastException);
        }
    }

    void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
        for (ShardInfo shard : this.shardInfoShardConsumerMap.keySet()) {
            boolean isShutdown;
            if (assignedShards.contains(shard) || !(isShutdown = ((ShardConsumer)this.shardInfoShardConsumerMap.get(shard)).beginShutdown())) continue;
            this.shardInfoShardConsumerMap.remove(shard);
        }
    }

    private List<ShardInfo> getShardInfoForAssignments() {
        List<ShardInfo> assignedStreamShards = this.leaseCoordinator.getCurrentAssignments();
        if (assignedStreamShards != null && !assignedStreamShards.isEmpty()) {
            if (this.wlog.isInfoEnabled()) {
                StringBuilder builder = new StringBuilder();
                boolean firstItem = true;
                for (ShardInfo shardInfo : assignedStreamShards) {
                    if (!firstItem) {
                        builder.append(", ");
                    }
                    builder.append(shardInfo.getShardId());
                    firstItem = false;
                }
                this.wlog.info("Current stream shard assignments: " + builder.toString());
            }
        } else {
            this.wlog.info("No activities assigned");
        }
        return assignedStreamShards;
    }

    public void shutdown() {
        this.shutdown = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
        ConcurrentMap<ShardInfo, ShardConsumer> concurrentMap = this.shardInfoShardConsumerMap;
        synchronized (concurrentMap) {
            ShardConsumer consumer = (ShardConsumer)this.shardInfoShardConsumerMap.get(shardInfo);
            if (consumer == null || consumer.isShutdown() && consumer.getShutdownReason().equals((Object)ShutdownReason.ZOMBIE)) {
                IRecordProcessor recordProcessor = factory.createProcessor();
                consumer = new ShardConsumer(shardInfo, this.streamConfig, this.checkpointTracker, recordProcessor, this.leaseCoordinator.getLeaseManager(), this.parentShardPollIntervalMillis, this.cleanupLeasesUponShardCompletion, this.executorService, this.metricsFactory, this.taskBackoffTimeMillis);
                this.shardInfoShardConsumerMap.put(shardInfo, consumer);
                this.wlog.infoForce("Created new shardConsumer for : " + shardInfo);
            }
            return consumer;
        }
    }

    private static class WorkerLog {
        private long reportIntervalMillis = TimeUnit.MINUTES.toMillis(1L);
        private long nextReportTime = System.currentTimeMillis() + this.reportIntervalMillis;
        private boolean infoReporting;

        private WorkerLog() {
        }

        public void debug(Object message, Throwable t) {
            LOG.debug(message, t);
        }

        public void info(Object message) {
            if (this.isInfoEnabled()) {
                LOG.info(message);
            }
        }

        public void infoForce(Object message) {
            LOG.info(message);
        }

        public void warn(Object message) {
            LOG.warn(message);
        }

        public void error(Object message, Throwable t) {
            LOG.error(message, t);
        }

        private boolean isInfoEnabled() {
            return this.infoReporting;
        }

        private void resetInfoLogging() {
            if (this.infoReporting) {
                if (LOG.isInfoEnabled()) {
                    this.infoReporting = false;
                    this.nextReportTime = System.currentTimeMillis() + this.reportIntervalMillis;
                }
            } else if (this.nextReportTime <= System.currentTimeMillis()) {
                this.infoReporting = true;
            }
        }
    }
}

