/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.UpstreamRecoveryTracker;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

@Internal
public class InputProcessorUtil {
    public static CheckpointedInputGate[] createCheckpointedMultipleInputGate(MailboxExecutor mailboxExecutor, List<IndexedInputGate>[] inputGates, TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler barrierHandler, StreamConfig config) {
        InputProcessorUtil.registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
        InputGate[] unionedInputGates = (InputGate[])Arrays.stream(inputGates).map(InputGateUtil::createInputGate).toArray(InputGate[]::new);
        return (CheckpointedInputGate[])Arrays.stream(unionedInputGates).map(unionedInputGate -> new CheckpointedInputGate((InputGate)unionedInputGate, barrierHandler, mailboxExecutor, config.isGraphContainingLoops() ? UpstreamRecoveryTracker.NO_OP : UpstreamRecoveryTracker.forInputGate(unionedInputGate))).toArray(CheckpointedInputGate[]::new);
    }

    public static CheckpointBarrierHandler createCheckpointBarrierHandler(CheckpointableTask toNotifyOnCheckpoint, Configuration jobConf, StreamConfig config, SubtaskCheckpointCoordinator checkpointCoordinator, String taskName, List<IndexedInputGate>[] inputGates, List<StreamTaskSourceInput<?>> sourceInputs, MailboxExecutor mailboxExecutor, TimerService timerService) {
        CheckpointableInput[] inputs = (CheckpointableInput[])Stream.concat(Arrays.stream(inputGates).flatMap(Collection::stream), sourceInputs.stream()).sorted(Comparator.comparing(CheckpointableInput::getInputGateIndex)).toArray(CheckpointableInput[]::new);
        SystemClock clock = SystemClock.getInstance();
        CheckpointingMode checkpointingMode = CheckpointingOptions.getCheckpointingMode(jobConf);
        switch (checkpointingMode) {
            case EXACTLY_ONCE: {
                int numberOfChannels = (int)Arrays.stream(inputs).mapToLong(gate -> gate.getChannelInfos().size()).sum();
                return InputProcessorUtil.createBarrierHandler(toNotifyOnCheckpoint, jobConf, config, checkpointCoordinator, taskName, mailboxExecutor, timerService, inputs, clock, numberOfChannels);
            }
            case AT_LEAST_ONCE: {
                if (CheckpointingOptions.isUnalignedCheckpointEnabled(jobConf)) {
                    throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE checkpointing mode");
                }
                int numInputChannels = Arrays.stream(inputs).mapToInt(CheckpointableInput::getNumberOfInputChannels).sum();
                return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint, clock, config.getConfiguration().get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
            }
        }
        throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + String.valueOf((Object)checkpointingMode));
    }

    private static SingleCheckpointBarrierHandler createBarrierHandler(CheckpointableTask toNotifyOnCheckpoint, Configuration jobConf, StreamConfig config, SubtaskCheckpointCoordinator checkpointCoordinator, String taskName, MailboxExecutor mailboxExecutor, TimerService timerService, CheckpointableInput[] inputs, Clock clock, int numberOfChannels) {
        boolean enableCheckpointAfterTasksFinished = config.getConfiguration().get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH);
        if (CheckpointingOptions.isUnalignedCheckpointEnabled(jobConf)) {
            return SingleCheckpointBarrierHandler.alternating(taskName, toNotifyOnCheckpoint, checkpointCoordinator, clock, numberOfChannels, BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService), enableCheckpointAfterTasksFinished, inputs);
        }
        return SingleCheckpointBarrierHandler.aligned(taskName, toNotifyOnCheckpoint, clock, numberOfChannels, BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService), enableCheckpointAfterTasksFinished, inputs);
    }

    private static void registerCheckpointMetrics(TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler barrierHandler) {
        taskIOMetricGroup.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
        taskIOMetricGroup.gauge("checkpointStartDelayNanos", barrierHandler::getCheckpointStartDelayNanos);
    }
}

