/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.distribution;

import io.camunda.zeebe.engine.processing.distribution.CommandDistributionBehavior;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.DistributionState;
import io.camunda.zeebe.protocol.impl.record.value.distribution.CommandDistributionRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import java.util.Optional;

public class CommandDistributionAcknowledgeProcessor
implements TypedRecordProcessor<CommandDistributionRecord> {
    private static final String ERROR_PENDING_DISTRIBUTION_NOT_FOUND = "Expected to find pending distribution with key %d for partition %d, but no pending distribution was found.";
    private final CommandDistributionBehavior commandDistributionBehavior;
    private final DistributionState distributionState;
    private final TypedCommandWriter commandWriter;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;

    public CommandDistributionAcknowledgeProcessor(CommandDistributionBehavior commandDistributionBehavior, DistributionState distributionState, Writers writers) {
        this.commandDistributionBehavior = commandDistributionBehavior;
        this.distributionState = distributionState;
        this.commandWriter = writers.command();
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
    }

    @Override
    public void processRecord(TypedRecord<CommandDistributionRecord> record) {
        CommandDistributionRecord recordValue;
        int partitionId;
        long distributionKey = record.getKey();
        if (!this.distributionState.hasPendingDistribution(distributionKey, partitionId = (recordValue = (CommandDistributionRecord)record.getValue()).getPartitionId())) {
            this.rejectionWriter.appendRejection(record, RejectionType.NOT_FOUND, String.format(ERROR_PENDING_DISTRIBUTION_NOT_FOUND, distributionKey, partitionId));
            return;
        }
        this.stateWriter.appendFollowUpEvent(distributionKey, (Intent)CommandDistributionIntent.ACKNOWLEDGED, (RecordValue)recordValue);
        Optional<String> queueId = this.distributionState.getQueueIdForDistribution(distributionKey);
        queueId.ifPresent(queue -> this.commandDistributionBehavior.distributeNextInQueue((String)queue, partitionId));
        if (!this.distributionState.hasPendingDistribution(distributionKey)) {
            CommandDistributionRecord finishRecord = new CommandDistributionRecord().setPartitionId(record.getPartitionId()).setValueType(recordValue.getValueType()).setIntent(recordValue.getIntent());
            queueId.ifPresent(arg_0 -> ((CommandDistributionRecord)finishRecord).setQueueId(arg_0));
            this.commandWriter.appendFollowUpCommand(distributionKey, (Intent)CommandDistributionIntent.FINISH, (RecordValue)finishRecord);
        }
    }
}

