/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.MessageCorrelator;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.MessageCorrelationState;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.MessageSubscription;
import io.camunda.zeebe.engine.state.message.RequestData;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageCorrelationRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageCorrelationIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.InstantSource;
import org.agrona.DirectBuffer;

public final class MessageSubscriptionCorrelateProcessor
implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private static final String NO_SUBSCRIPTION_FOUND_MESSAGE = "Expected to correlate subscription for element with key '%d' and message name '%s', but no such message subscription exists";
    private static final String SUBSCRIPTION_ACKNOWLEDGE_ERROR_MESSAGE = "Expected to acknowledge correlating message with key '%d' to subscription with key '%d'";
    private static final String SUBSCRIPTION_ALREADY_CORRELATING_AGAIN_MESSAGE = "but the subscription is already correlating to another message with key '%d'";
    private static final String SUBSCRIPTION_ALREADY_CORRELATED_MESSAGE = "but the subscription has already been correlated'";
    private final MessageSubscriptionState subscriptionState;
    private final MessageCorrelationState messageCorrelationState;
    private final MessageCorrelator messageCorrelator;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;
    private final TypedResponseWriter responseWriter;

    public MessageSubscriptionCorrelateProcessor(int partitionId, MessageState messageState, MessageCorrelationState messageCorrelationState, MessageSubscriptionState subscriptionState, SubscriptionCommandSender commandSender, Writers writers, InstantSource clock) {
        this.subscriptionState = subscriptionState;
        this.messageCorrelationState = messageCorrelationState;
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        this.responseWriter = writers.response();
        this.messageCorrelator = new MessageCorrelator(partitionId, messageState, commandSender, this.stateWriter, writers.sideEffect(), clock);
    }

    @Override
    public void processRecord(TypedRecord<MessageSubscriptionRecord> record) {
        MessageSubscriptionRecord command = (MessageSubscriptionRecord)record.getValue();
        MessageSubscription subscription = this.subscriptionState.get(command.getElementInstanceKey(), command.getMessageNameBuffer());
        if (subscription == null) {
            String reason = MessageSubscriptionCorrelateProcessor.formatNoSubscriptionFoundReason(record);
            this.rejectionWriter.appendRejection(record, RejectionType.NOT_FOUND, reason);
            return;
        }
        if (subscription.getRecord().getMessageKey() != ((MessageSubscriptionRecord)record.getValue()).getMessageKey()) {
            String reason = MessageSubscriptionCorrelateProcessor.formatSubscriptionAlreadyCorrelatingAgainReason(record, subscription);
            this.rejectionWriter.appendRejection(record, RejectionType.INVALID_STATE, reason);
            return;
        }
        if (!subscription.isCorrelating()) {
            String reason = MessageSubscriptionCorrelateProcessor.formatSubscriptionAlreadyCorrelatedReason(record, subscription);
            this.rejectionWriter.appendRejection(record, RejectionType.INVALID_STATE, reason);
            return;
        }
        MessageSubscriptionRecord messageSubscription = subscription.getRecord();
        this.stateWriter.appendFollowUpEvent(subscription.getKey(), (Intent)MessageSubscriptionIntent.CORRELATED, (RecordValue)messageSubscription);
        this.writeCorrelationResponse(record, messageSubscription);
        if (!messageSubscription.isInterrupting()) {
            this.messageCorrelator.correlateNextMessage(subscription.getKey(), messageSubscription);
        }
    }

    private static String formatNoSubscriptionFoundReason(TypedRecord<MessageSubscriptionRecord> record) {
        return String.format(NO_SUBSCRIPTION_FOUND_MESSAGE, ((MessageSubscriptionRecord)record.getValue()).getElementInstanceKey(), BufferUtil.bufferAsString((DirectBuffer)((MessageSubscriptionRecord)record.getValue()).getMessageNameBuffer()));
    }

    private static String formatSubscriptionAlreadyCorrelatingAgainReason(TypedRecord<MessageSubscriptionRecord> record, MessageSubscription subscription) {
        return "%s %s".formatted(SUBSCRIPTION_ACKNOWLEDGE_ERROR_MESSAGE.formatted(((MessageSubscriptionRecord)record.getValue()).getMessageKey(), subscription.getKey()), SUBSCRIPTION_ALREADY_CORRELATING_AGAIN_MESSAGE.formatted(subscription.getRecord().getMessageKey()));
    }

    private static String formatSubscriptionAlreadyCorrelatedReason(TypedRecord<MessageSubscriptionRecord> record, MessageSubscription subscription) {
        return "%s %s".formatted(SUBSCRIPTION_ACKNOWLEDGE_ERROR_MESSAGE.formatted(((MessageSubscriptionRecord)record.getValue()).getMessageKey(), subscription.getKey()), SUBSCRIPTION_ALREADY_CORRELATED_MESSAGE);
    }

    private void writeCorrelationResponse(TypedRecord<MessageSubscriptionRecord> record, MessageSubscriptionRecord messageSubscription) {
        long messageKey = messageSubscription.getMessageKey();
        if (this.messageCorrelationState.existsRequestDataForMessageKey(messageKey)) {
            RequestData requestData = this.messageCorrelationState.getRequestData(messageKey);
            MessageCorrelationRecord messageCorrelationRecord = new MessageCorrelationRecord().setName(messageSubscription.getMessageName()).setCorrelationKey(messageSubscription.getCorrelationKey()).setVariables(messageSubscription.getVariablesBuffer()).setTenantId(messageSubscription.getTenantId()).setMessageKey(messageKey).setProcessInstanceKey(messageSubscription.getProcessInstanceKey());
            this.stateWriter.appendFollowUpEvent(messageKey, (Intent)MessageCorrelationIntent.CORRELATED, (RecordValue)messageCorrelationRecord);
            this.responseWriter.writeResponse(((MessageSubscriptionRecord)record.getValue()).getMessageKey(), (Intent)MessageCorrelationIntent.CORRELATED, (UnpackedObject)messageCorrelationRecord, ValueType.MESSAGE_CORRELATION, requestData.getRequestId(), requestData.getRequestStreamId());
        }
    }
}

