/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.clock;

import io.camunda.zeebe.engine.processing.distribution.CommandDistributionBehavior;
import io.camunda.zeebe.engine.processing.streamprocessor.DistributedTypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.value.clock.ClockRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.ClockIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.stream.api.SideEffectProducer;
import io.camunda.zeebe.stream.api.StreamClock;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
import java.time.Instant;

public final class ClockProcessor
implements DistributedTypedRecordProcessor<ClockRecord> {
    private final SideEffectWriter sideEffectWriter;
    private final StateWriter stateWriter;
    private final KeyGenerator keyGenerator;
    private final StreamClock.ControllableStreamClock clock;
    private final CommandDistributionBehavior commandDistributionBehavior;
    private final TypedResponseWriter responseWriter;
    private final TypedRejectionWriter rejectionWriter;

    public ClockProcessor(Writers writers, KeyGenerator keyGenerator, StreamClock.ControllableStreamClock clock, CommandDistributionBehavior commandDistributionBehavior) {
        this.sideEffectWriter = writers.sideEffect();
        this.stateWriter = writers.state();
        this.keyGenerator = keyGenerator;
        this.responseWriter = writers.response();
        this.rejectionWriter = writers.rejection();
        this.clock = clock;
        this.commandDistributionBehavior = commandDistributionBehavior;
    }

    @Override
    public void processNewCommand(TypedRecord<ClockRecord> command) {
        ClockIntent intent = (ClockIntent)command.getIntent();
        ClockRecord clockRecord = (ClockRecord)command.getValue();
        if (intent == ClockIntent.PIN && clockRecord.getTime() < 0L) {
            String rejectionMessage = "Expected pin time to be not negative but it was %d".formatted(clockRecord.getTime());
            this.rejectionWriter.appendRejection(command, RejectionType.INVALID_ARGUMENT, rejectionMessage);
            this.responseWriter.writeRejectionOnCommand(command, RejectionType.INVALID_ARGUMENT, rejectionMessage);
            return;
        }
        long eventKey = this.keyGenerator.nextKey();
        ClockIntent resultIntent = this.followUpIntent(intent);
        this.applyClockModification(eventKey, intent, resultIntent, clockRecord);
        if (command.hasRequestMetadata()) {
            this.responseWriter.writeEventOnCommand(eventKey, (Intent)resultIntent, (UnpackedObject)clockRecord, command);
        }
        this.commandDistributionBehavior.withKey(eventKey).unordered().distribute(command);
    }

    @Override
    public void processDistributedCommand(TypedRecord<ClockRecord> command) {
        ClockIntent commandIntent = (ClockIntent)command.getIntent();
        ClockIntent resultIntent = this.followUpIntent(commandIntent);
        this.applyClockModification(command.getKey(), commandIntent, resultIntent, (ClockRecord)command.getValue());
        this.commandDistributionBehavior.acknowledgeCommand(command);
    }

    private void applyClockModification(long key, ClockIntent commandIntent, ClockIntent resultIntent, ClockRecord clockRecord) {
        SideEffectProducer sideEffect = this.clockModification(commandIntent, clockRecord);
        this.sideEffectWriter.appendSideEffect(sideEffect);
        this.stateWriter.appendFollowUpEvent(key, (Intent)resultIntent, (RecordValue)clockRecord);
    }

    private ClockIntent followUpIntent(ClockIntent intent) {
        return switch (intent) {
            default -> throw new MatchException(null, null);
            case ClockIntent.PIN -> ClockIntent.PINNED;
            case ClockIntent.RESET -> ClockIntent.RESETTED;
            case ClockIntent.RESETTED, ClockIntent.PINNED -> throw new IllegalStateException("Expected a command intent, but got " + intent.name());
        };
    }

    private SideEffectProducer clockModification(ClockIntent intent, ClockRecord value) {
        return switch (intent) {
            default -> throw new MatchException(null, null);
            case ClockIntent.PIN -> {
                Instant pinnedAt = Instant.ofEpochMilli(value.getTime());
                yield () -> {
                    this.clock.pinAt(pinnedAt);
                    return true;
                };
            }
            case ClockIntent.RESET -> () -> {
                this.clock.reset();
                return true;
            };
            case ClockIntent.RESETTED, ClockIntent.PINNED -> throw new IllegalStateException("Expected a command intent, but got " + intent.name());
        };
    }
}

