/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.MessageTimeToLiveChecker;
import io.camunda.zeebe.engine.processing.message.PendingMessageSubscriptionChecker;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ScheduledTaskState;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService;
import io.camunda.zeebe.stream.api.scheduling.Task;
import java.time.Duration;
import java.util.function.Supplier;

public final class MessageObserver
implements StreamProcessorLifecycleAware {
    public static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(10L);
    public static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30L);
    private final SubscriptionCommandSender subscriptionCommandSender;
    private final Supplier<ScheduledTaskState> scheduledTaskStateFactory;
    private final PendingMessageSubscriptionState pendingState;
    private final int messagesTtlCheckerBatchLimit;
    private final Duration messagesTtlCheckerInterval;
    private final boolean enableMessageTtlCheckerAsync;

    public MessageObserver(Supplier<ScheduledTaskState> scheduledTaskStateFactory, PendingMessageSubscriptionState pendingState, SubscriptionCommandSender subscriptionCommandSender, Duration messagesTtlCheckerInterval, int messagesTtlCheckerBatchLimit, boolean enableMessageTtlCheckerAsync) {
        this.subscriptionCommandSender = subscriptionCommandSender;
        this.scheduledTaskStateFactory = scheduledTaskStateFactory;
        this.pendingState = pendingState;
        this.messagesTtlCheckerInterval = messagesTtlCheckerInterval;
        this.messagesTtlCheckerBatchLimit = messagesTtlCheckerBatchLimit;
        this.enableMessageTtlCheckerAsync = enableMessageTtlCheckerAsync;
    }

    public void onRecovered(ReadonlyStreamProcessorContext context) {
        this.scheduleMessageTtlChecker(context);
        this.schedulePendingMessageSubscriptionChecker(context);
    }

    private void scheduleMessageTtlChecker(ReadonlyStreamProcessorContext context) {
        ProcessingScheduleService scheduleService = context.getScheduleService();
        MessageState messageState = this.scheduledTaskStateFactory.get().getMessageState();
        MessageTimeToLiveChecker timeToLiveChecker = new MessageTimeToLiveChecker(this.messagesTtlCheckerInterval, this.messagesTtlCheckerBatchLimit, this.enableMessageTtlCheckerAsync, scheduleService, messageState);
        if (this.enableMessageTtlCheckerAsync) {
            scheduleService.runDelayedAsync(this.messagesTtlCheckerInterval, (Task)timeToLiveChecker);
        } else {
            scheduleService.runDelayed(this.messagesTtlCheckerInterval, (Task)timeToLiveChecker);
        }
    }

    private void schedulePendingMessageSubscriptionChecker(ReadonlyStreamProcessorContext context) {
        ProcessingScheduleService scheduleService = context.getScheduleService();
        PendingMessageSubscriptionChecker pendingSubscriptionChecker = new PendingMessageSubscriptionChecker(this.subscriptionCommandSender, this.pendingState, SUBSCRIPTION_TIMEOUT.toMillis());
        scheduleService.runAtFixedRate(SUBSCRIPTION_CHECK_INTERVAL, (Runnable)pendingSubscriptionChecker);
    }
}

