/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.distribution;

import io.camunda.zeebe.engine.state.immutable.DistributionState;
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CommandRedistributor
implements StreamProcessorLifecycleAware {
    public static final Duration COMMAND_REDISTRIBUTION_INTERVAL = Duration.ofSeconds(10L);
    private static final Duration RETRY_MAX_BACKOFF_DURATION = Duration.ofMinutes(5L);
    private static final long MAX_RETRY_CYCLES = RETRY_MAX_BACKOFF_DURATION.dividedBy(COMMAND_REDISTRIBUTION_INTERVAL);
    private static final Logger LOG = LoggerFactory.getLogger(CommandRedistributor.class);
    private final DistributionState distributionState;
    private final InterPartitionCommandSender commandSender;
    private final Map<RetriableDistribution, Long> retryCyclesPerDistribution = new HashMap<RetriableDistribution, Long>();

    public CommandRedistributor(DistributionState distributionState, InterPartitionCommandSender commandSender) {
        this.distributionState = distributionState;
        this.commandSender = commandSender;
    }

    public void onRecovered(ReadonlyStreamProcessorContext context) {
        context.getScheduleService().runAtFixedRate(COMMAND_REDISTRIBUTION_INTERVAL, this::runRetryCycle);
    }

    private void runRetryCycle() {
        HashSet retriableDistributions = new HashSet();
        this.distributionState.foreachRetriableDistribution((distributionKey, record) -> {
            RetriableDistribution retriable = new RetriableDistribution(distributionKey, record.getPartitionId());
            retriableDistributions.add(retriable);
            this.retryDistribution(retriable, record);
        });
        this.retryCyclesPerDistribution.keySet().removeIf(Predicate.not(retriableDistributions::contains));
    }

    private void retryDistribution(RetriableDistribution retriable, CommandDistributionRecord commandDistributionRecord) {
        if (!this.shouldRetryNow(retriable)) {
            return;
        }
        LOG.info("Retrying to distribute retriable command {} to partition {}", (Object)retriable.distributionKey, (Object)retriable.partitionId);
        this.commandSender.sendCommand(retriable.partitionId, commandDistributionRecord.getValueType(), commandDistributionRecord.getIntent(), Long.valueOf(retriable.distributionKey), commandDistributionRecord.getCommandValue());
    }

    private boolean shouldRetryNow(RetriableDistribution retriableDistribution) {
        long retryCycle = this.retryCyclesPerDistribution.compute(retriableDistribution, (k, retryCycles) -> retryCycles != null ? retryCycles + 1L : 0L);
        if (retryCycle >= MAX_RETRY_CYCLES) {
            return retryCycle % MAX_RETRY_CYCLES == 0L;
        }
        return Long.bitCount(retryCycle) == 1;
    }

    private record RetriableDistribution(long distributionKey, int partitionId) {
    }
}

