/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb.stream;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumerFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamScheduler
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class);
    private static final Logger SHARD_COUNT_LOGGER = LoggerFactory.getLogger((String)"org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardCountLogger");
    private static final int MAX_JOB_COUNT = 150;
    private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15000;
    static final String ACTIVE_CHANGE_EVENT_CONSUMERS = "activeChangeEventConsumers";
    static final String SHARDS_IN_PROCESSING = "activeShardsInProcessing";
    private final AtomicInteger numOfWorkers = new AtomicInteger(0);
    private final EnhancedSourceCoordinator coordinator;
    private final ShardConsumerFactory consumerFactory;
    private final ExecutorService executor;
    private final PluginMetrics pluginMetrics;
    private final AtomicLong activeChangeEventConsumers;
    private final AtomicLong shardsInProcessing;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final DynamoDBSourceConfig dynamoDBSourceConfig;
    private final BackoffCalculator backoffCalculator;
    private int noAvailableShardsCount = 0;

    public StreamScheduler(EnhancedSourceCoordinator coordinator, ShardConsumerFactory consumerFactory, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager, DynamoDBSourceConfig dynamoDBSourceConfig, BackoffCalculator backoffCalculator) {
        this.coordinator = coordinator;
        this.consumerFactory = consumerFactory;
        this.pluginMetrics = pluginMetrics;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.dynamoDBSourceConfig = dynamoDBSourceConfig;
        this.backoffCalculator = backoffCalculator;
        this.executor = Executors.newFixedThreadPool(150);
        this.activeChangeEventConsumers = (AtomicLong)pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, (Number)new AtomicLong());
        this.shardsInProcessing = (AtomicLong)pluginMetrics.gauge(SHARDS_IN_PROCESSING, (Number)new AtomicLong());
    }

    private void processStreamPartition(StreamPartition streamPartition) {
        Runnable shardConsumer;
        boolean acknowledgmentsEnabled = this.dynamoDBSourceConfig.isAcknowledgmentsEnabled();
        AcknowledgementSet acknowledgementSet = null;
        if (acknowledgmentsEnabled) {
            acknowledgementSet = this.acknowledgementSetManager.create(result -> {
                if (result.booleanValue()) {
                    LOG.info("Received acknowledgment of completion from sink for shard {}", (Object)streamPartition.getShardId());
                    this.completeConsumer(streamPartition).accept(null, null);
                } else {
                    LOG.warn("Negative acknowledgment received for shard {}, it will be retried", (Object)streamPartition.getShardId());
                    this.coordinator.giveUpPartition((EnhancedSourcePartition)streamPartition);
                }
            }, this.dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
        }
        if ((shardConsumer = this.consumerFactory.createConsumer(streamPartition, acknowledgementSet, this.dynamoDBSourceConfig.getShardAcknowledgmentTimeout())) != null) {
            CompletableFuture<Void> runConsumer = CompletableFuture.runAsync(shardConsumer, this.executor);
            if (acknowledgmentsEnabled) {
                runConsumer.whenComplete((v, ex) -> {
                    this.numOfWorkers.decrementAndGet();
                    if (ex != null) {
                        LOG.error(DataPrepperMarkers.NOISY, "Received exception while processing shard {}, giving up this shard for reprocessing: {}", (Object)streamPartition.getShardId(), ex);
                        this.coordinator.giveUpPartition((EnhancedSourcePartition)streamPartition);
                    }
                    if (this.numOfWorkers.get() == 0) {
                        this.activeChangeEventConsumers.decrementAndGet();
                    }
                    this.shardsInProcessing.decrementAndGet();
                });
            } else {
                runConsumer.whenComplete(this.completeConsumer(streamPartition));
            }
            this.numOfWorkers.incrementAndGet();
            if (this.numOfWorkers.get() % 10 == 0) {
                SHARD_COUNT_LOGGER.info("Actively processing {} shards", (Object)this.numOfWorkers.get());
            }
            if (this.numOfWorkers.get() >= 1) {
                this.activeChangeEventConsumers.incrementAndGet();
            }
            this.shardsInProcessing.incrementAndGet();
        } else {
            this.coordinator.completePartition((EnhancedSourcePartition)streamPartition);
        }
    }

    @Override
    public void run() {
        LOG.debug("Stream Scheduler start to run...");
        while (!Thread.currentThread().isInterrupted()) {
            try {
                if (this.numOfWorkers.get() < 150) {
                    Optional sourcePartition = this.coordinator.acquireAvailablePartition("STREAM");
                    if (sourcePartition.isPresent()) {
                        StreamPartition streamPartition = (StreamPartition)((Object)sourcePartition.get());
                        this.processStreamPartition(streamPartition);
                        this.noAvailableShardsCount = 0;
                    } else {
                        ++this.noAvailableShardsCount;
                    }
                }
                try {
                    Thread.sleep(this.backoffCalculator.calculateBackoffToAcquireNextShard(this.noAvailableShardsCount, this.numOfWorkers));
                    continue;
                }
                catch (InterruptedException e) {
                    LOG.info("InterruptedException occurred");
                }
            }
            catch (Exception e) {
                LOG.error("Received an exception while processing a shard for streams, backing off and retrying", (Throwable)e);
                try {
                    Thread.sleep(15000L);
                    continue;
                }
                catch (InterruptedException ex) {
                    LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing");
                }
            }
            break;
        }
        LOG.warn("Stream Scheduler is interrupted, looks like shutdown has triggered");
        ShardConsumer.stopAll();
        this.executor.shutdown();
    }

    private BiConsumer completeConsumer(StreamPartition streamPartition) {
        return (v, ex) -> {
            if (!this.dynamoDBSourceConfig.isAcknowledgmentsEnabled()) {
                this.numOfWorkers.decrementAndGet();
                if (this.numOfWorkers.get() == 0) {
                    this.activeChangeEventConsumers.decrementAndGet();
                }
                this.shardsInProcessing.decrementAndGet();
            }
            if (ex == null) {
                LOG.info("Shard consumer for {} is completed", (Object)streamPartition.getShardId());
                this.coordinator.completePartition((EnhancedSourcePartition)streamPartition);
            } else {
                LOG.error("Received an exception while processing shard {}, giving up shard: {}", (Object)streamPartition.getShardId(), ex);
                this.coordinator.giveUpPartition((EnhancedSourcePartition)streamPartition);
            }
        };
    }
}

